Merge branch 'master' into Follow_up_Backup_Restore_concurrency_check_node_2

This commit is contained in:
SmitaRKulkarni 2023-04-13 09:46:36 +02:00 committed by GitHub
commit 6568c330c5
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
109 changed files with 1867 additions and 417 deletions

2
contrib/libhdfs3 vendored

@ -1 +1 @@
Subproject commit 9ee3ce77215fca83b7fdfcfe2186a3db0d0bdb74
Subproject commit 3c91d96ff29fe5928f055519c6d979c4b104db9e

View File

@ -18,13 +18,13 @@ RUN apt-get update \
# and MEMORY_LIMIT_EXCEEDED exceptions in Functional tests (total memory limit in Functional tests is ~55.24 GiB).
# TSAN will flush shadow memory when reaching this limit.
# It may cause false-negatives, but it's better than OOM.
RUN echo "TSAN_OPTIONS='verbosity=1000 halt_on_error=1 history_size=7 memory_limit_mb=46080'" >> /etc/environment
RUN echo "TSAN_OPTIONS='verbosity=1000 halt_on_error=1 history_size=7 memory_limit_mb=46080 second_deadlock_stack=1'" >> /etc/environment
RUN echo "UBSAN_OPTIONS='print_stacktrace=1'" >> /etc/environment
RUN echo "MSAN_OPTIONS='abort_on_error=1 poison_in_dtor=1'" >> /etc/environment
RUN echo "LSAN_OPTIONS='suppressions=/usr/share/clickhouse-test/config/lsan_suppressions.txt'" >> /etc/environment
# Sanitizer options for current shell (not current, but the one that will be spawned on "docker run")
# (but w/o verbosity for TSAN, otherwise test.reference will not match)
ENV TSAN_OPTIONS='halt_on_error=1 history_size=7 memory_limit_mb=46080'
ENV TSAN_OPTIONS='halt_on_error=1 history_size=7 memory_limit_mb=46080 second_deadlock_stack=1'
ENV UBSAN_OPTIONS='print_stacktrace=1'
ENV MSAN_OPTIONS='abort_on_error=1 poison_in_dtor=1'

View File

@ -108,6 +108,12 @@ RUN set -x \
&& echo 'dockremap:165536:65536' >> /etc/subuid \
&& echo 'dockremap:165536:65536' >> /etc/subgid
# Same options as in test/base/Dockerfile
# (in case you need to override them in tests)
ENV TSAN_OPTIONS='halt_on_error=1 history_size=7 memory_limit_mb=46080 second_deadlock_stack=1'
ENV UBSAN_OPTIONS='print_stacktrace=1'
ENV MSAN_OPTIONS='abort_on_error=1 poison_in_dtor=1'
EXPOSE 2375
ENTRYPOINT ["dockerd-entrypoint.sh"]
CMD ["sh", "-c", "pytest $PYTEST_OPTS"]

View File

@ -40,6 +40,39 @@ SETTINGS additional_table_filters = (('table_1', 'x != 2'))
└───┴──────┘
```
## additional_result_filter
An additional filter expression to apply to the result of `SELECT` query.
This setting is not applied to any subquery.
Default value: `''`.
**Example**
``` sql
insert into table_1 values (1, 'a'), (2, 'bb'), (3, 'ccc'), (4, 'dddd');
```
```response
┌─x─┬─y────┐
│ 1 │ a │
│ 2 │ bb │
│ 3 │ ccc │
│ 4 │ dddd │
└───┴──────┘
```
```sql
SELECT *
FROM table_1
SETTINGS additional_result_filter = 'x != 2'
```
```response
┌─x─┬─y────┐
│ 1 │ a │
│ 3 │ ccc │
│ 4 │ dddd │
└───┴──────┘
```
## allow_nondeterministic_mutations {#allow_nondeterministic_mutations}
User-level setting that allows mutations on replicated tables to make use of non-deterministic functions such as `dictGet`.

View File

@ -645,7 +645,7 @@ For an alternative to `date\_diff`, see function `age`.
date_diff('unit', startdate, enddate, [timezone])
```
Aliases: `dateDiff`, `DATE_DIFF`.
Aliases: `dateDiff`, `DATE_DIFF`, `timestampDiff`, `timestamp_diff`, `TIMESTAMP_DIFF`.
**Arguments**

View File

@ -194,7 +194,14 @@ Accepts a number. If the number is less than one, it returns 0. Otherwise, it ro
## roundAge(num)
Accepts a number. If the number is less than 18, it returns 0. Otherwise, it rounds the number down to a number from the set: 18, 25, 35, 45, 55.
Accepts a number. If the number is
- smaller than 1, it returns 0,
- between 1 and 17, it returns 17,
- between 18 and 24, it returns 18,
- between 25 and 34, it returns 25,
- between 35 and 44, it returns 35,
- between 45 and 54, it returns 45,
- larger than 55, it returns 55.
## roundDown(num, arr)

View File

@ -491,7 +491,7 @@ std::vector<std::pair<ASTPtr, StoragePtr>> BackupEntriesCollector::findTablesInD
{
/// Database or table could be replicated - so may use ZooKeeper. We need to retry.
auto zookeeper_retries_info = global_zookeeper_retries_info;
ZooKeeperRetriesControl retries_ctl("getTablesForBackup", zookeeper_retries_info);
ZooKeeperRetriesControl retries_ctl("getTablesForBackup", zookeeper_retries_info, nullptr);
retries_ctl.retryLoop([&](){ db_tables = database->getTablesForBackup(filter_by_table_name, context); });
}
catch (Exception & e)

View File

@ -20,7 +20,7 @@ WithRetries::WithRetries(Poco::Logger * log_, zkutil::GetZooKeeper get_zookeeper
WithRetries::RetriesControlHolder::RetriesControlHolder(const WithRetries * parent, const String & name)
: info(parent->global_zookeeper_retries_info)
, retries_ctl(name, info)
, retries_ctl(name, info, nullptr)
, faulty_zookeeper(parent->getFaultyZooKeeper())
{}

View File

@ -138,6 +138,8 @@
M(SystemReplicasThreadsActive, "Number of threads in the system.replicas thread pool running a task.") \
M(RestartReplicaThreads, "Number of threads in the RESTART REPLICA thread pool.") \
M(RestartReplicaThreadsActive, "Number of threads in the RESTART REPLICA thread pool running a task.") \
M(QueryPipelineExecutorThreads, "Number of threads in the PipelineExecutor thread pool.") \
M(QueryPipelineExecutorThreadsActive, "Number of threads in the PipelineExecutor thread pool running a task.") \
M(DistributedFilesToInsert, "Number of pending files to process for asynchronous insertion into Distributed tables. Number of files for every shard is summed.") \
M(BrokenDistributedFilesToInsert, "Number of files for asynchronous insertion into Distributed tables that has been marked as broken. This metric will starts from 0 on start. Number of files for every shard is summed.") \
M(TablesToDropQueueSize, "Number of dropped tables, that are waiting for background data removal.") \

View File

@ -19,6 +19,7 @@
#include <Core/ExternalTable.h>
#include <Poco/Net/MessageHeader.h>
#include <base/find_symbols.h>
#include <base/scope_guard.h>
namespace DB

View File

@ -453,8 +453,8 @@ struct SettingFieldMultiEnum
explicit operator StorageType() const { return value.getValue(); }
explicit operator Field() const { return toString(); }
SettingFieldMultiEnum & operator= (StorageType x) { changed = x != value.getValue(); value.setValue(x); return *this; }
SettingFieldMultiEnum & operator= (ValueType x) { changed = !(x == value); value = x; return *this; }
SettingFieldMultiEnum & operator= (StorageType x) { changed = true; value.setValue(x); return *this; }
SettingFieldMultiEnum & operator= (ValueType x) { changed = true; value = x; return *this; }
SettingFieldMultiEnum & operator= (const Field & x) { parseFromString(x.safeGet<const String &>()); return *this; }
String toString() const

View File

@ -122,7 +122,7 @@ GTEST_TEST(SettingMySQLDataTypesSupport, SetString)
// comma with spaces
setting = " datetime64 , decimal ";
ASSERT_FALSE(setting.changed); // false since value is the same as previous one.
ASSERT_TRUE(setting.changed);
ASSERT_TRUE(setting.value.isSet(MySQLDataTypesSupport::DECIMAL));
ASSERT_TRUE(setting.value.isSet(MySQLDataTypesSupport::DATETIME64));
ASSERT_EQ("decimal,datetime64", setting.toString());
@ -136,7 +136,7 @@ GTEST_TEST(SettingMySQLDataTypesSupport, SetString)
ASSERT_EQ(Field("decimal"), setting);
setting = String(",decimal,decimal,decimal,decimal,decimal,decimal,decimal,decimal,decimal,");
ASSERT_FALSE(setting.changed); //since previous value was DECIMAL
ASSERT_TRUE(setting.changed); //since previous value was DECIMAL
ASSERT_TRUE(setting.value.isSet(MySQLDataTypesSupport::DECIMAL));
ASSERT_FALSE(setting.value.isSet(MySQLDataTypesSupport::DATETIME64));
ASSERT_EQ("decimal", setting.toString());
@ -163,7 +163,7 @@ GTEST_TEST(SettingMySQLDataTypesSupport, SetInvalidString)
ASSERT_EQ(0, setting.value.getValue());
EXPECT_NO_THROW(setting = String(", "));
ASSERT_FALSE(setting.changed);
ASSERT_TRUE(setting.changed);
ASSERT_EQ(0, setting.value.getValue());
}

View File

@ -12,6 +12,7 @@ namespace DB
namespace ErrorCodes
{
extern const int ARGUMENT_OUT_OF_BOUND;
extern const int LOGICAL_ERROR;
}
static constexpr UInt32 max_scale = 9;
@ -56,4 +57,14 @@ SerializationPtr DataTypeDateTime64::doGetDefaultSerialization() const
return std::make_shared<SerializationDateTime64>(scale, *this);
}
std::string getDateTimeTimezone(const IDataType & data_type)
{
if (const auto * type = typeid_cast<const DataTypeDateTime *>(&data_type))
return type->hasExplicitTimeZone() ? type->getTimeZone().getTimeZone() : std::string();
if (const auto * type = typeid_cast<const DataTypeDateTime64 *>(&data_type))
return type->hasExplicitTimeZone() ? type->getTimeZone().getTimeZone() : std::string();
throw Exception(ErrorCodes::LOGICAL_ERROR, "Cannot get time zone from type {}", data_type.getName());
}
}

View File

@ -41,5 +41,7 @@ protected:
SerializationPtr doGetDefaultSerialization() const override;
};
std::string getDateTimeTimezone(const IDataType & data_type);
}

View File

@ -556,6 +556,7 @@ inline bool isNullableOrLowCardinalityNullable(const DataTypePtr & data_type)
template <typename DataType> constexpr bool IsDataTypeDecimal = false;
template <typename DataType> constexpr bool IsDataTypeNumber = false;
template <typename DataType> constexpr bool IsDataTypeDateOrDateTime = false;
template <typename DataType> constexpr bool IsDataTypeDate = false;
template <typename DataType> constexpr bool IsDataTypeEnum = false;
template <typename DataType> constexpr bool IsDataTypeDecimalOrNumber = IsDataTypeDecimal<DataType> || IsDataTypeNumber<DataType>;
@ -576,6 +577,9 @@ template <> inline constexpr bool IsDataTypeDecimal<DataTypeDateTime64> = true;
template <typename T> constexpr bool IsDataTypeNumber<DataTypeNumber<T>> = true;
template <> inline constexpr bool IsDataTypeDate<DataTypeDate> = true;
template <> inline constexpr bool IsDataTypeDate<DataTypeDate32> = true;
template <> inline constexpr bool IsDataTypeDateOrDateTime<DataTypeDate> = true;
template <> inline constexpr bool IsDataTypeDateOrDateTime<DataTypeDate32> = true;
template <> inline constexpr bool IsDataTypeDateOrDateTime<DataTypeDateTime> = true;

View File

@ -115,10 +115,13 @@ void DDLLoadingDependencyVisitor::visit(const ASTStorage & storage, Data & data)
{
if (!storage.engine)
return;
if (storage.engine->name != "Dictionary")
return;
extractTableNameFromArgument(*storage.engine, data, 0);
if (storage.engine->name == "Distributed")
/// Checks that dict* expression was used as sharding_key and builds dependency between the dictionary and current table.
/// Distributed(logs, default, hits[, sharding_key[, policy_name]])
extractTableNameFromArgument(*storage.engine, data, 3);
else if (storage.engine->name == "Dictionary")
extractTableNameFromArgument(*storage.engine, data, 0);
}
@ -131,7 +134,29 @@ void DDLLoadingDependencyVisitor::extractTableNameFromArgument(const ASTFunction
QualifiedTableName qualified_name;
const auto * arg = function.arguments->as<ASTExpressionList>()->children[arg_idx].get();
if (const auto * literal = arg->as<ASTLiteral>())
if (const auto * dict_function = arg->as<ASTFunction>())
{
if (!functionIsDictGet(dict_function->name))
return;
/// Get the dictionary name from `dict*` function.
const auto * literal_arg = dict_function->arguments->as<ASTExpressionList>()->children[0].get();
const auto * dictionary_name = literal_arg->as<ASTLiteral>();
if (!dictionary_name)
return;
if (dictionary_name->value.getType() != Field::Types::String)
return;
auto maybe_qualified_name = QualifiedTableName::tryParseFromString(dictionary_name->value.get<String>());
if (!maybe_qualified_name)
return;
qualified_name = std::move(*maybe_qualified_name);
}
else if (const auto * literal = arg->as<ASTLiteral>())
{
if (literal->value.getType() != Field::Types::String)
return;
@ -167,5 +192,4 @@ void DDLLoadingDependencyVisitor::extractTableNameFromArgument(const ASTFunction
}
data.dependencies.emplace(std::move(qualified_name));
}
}

View File

@ -5,6 +5,9 @@
#include <Columns/ColumnsNumber.h>
#include <DataTypes/DataTypeArray.h>
#include <DataTypes/DataTypeDate.h>
#include <DataTypes/DataTypeDate32.h>
#include <DataTypes/DataTypeDateTime.h>
#include <DataTypes/DataTypeDateTime64.h>
#include <DataTypes/DataTypesDecimal.h>
#include <DataTypes/DataTypesNumber.h>
@ -81,9 +84,10 @@ struct ArrayAggregateResultImpl<ArrayElement, AggregateOperation::sum>
std::conditional_t<std::is_same_v<ArrayElement, Decimal64>, Decimal128,
std::conditional_t<std::is_same_v<ArrayElement, Decimal128>, Decimal128,
std::conditional_t<std::is_same_v<ArrayElement, Decimal256>, Decimal256,
std::conditional_t<std::is_same_v<ArrayElement, DateTime64>, Decimal128,
std::conditional_t<std::is_floating_point_v<ArrayElement>, Float64,
std::conditional_t<std::is_signed_v<ArrayElement>, Int64,
UInt64>>>>>>>>>>;
UInt64>>>>>>>>>>>;
};
template <typename ArrayElement, AggregateOperation operation>
@ -108,26 +112,53 @@ struct ArrayAggregateImpl
using Types = std::decay_t<decltype(types)>;
using DataType = typename Types::LeftType;
if constexpr (aggregate_operation == AggregateOperation::average || aggregate_operation == AggregateOperation::product)
if constexpr (!IsDataTypeDateOrDateTime<DataType>)
{
result = std::make_shared<DataTypeFloat64>();
if constexpr (aggregate_operation == AggregateOperation::average || aggregate_operation == AggregateOperation::product)
{
result = std::make_shared<DataTypeFloat64>();
return true;
return true;
}
else if constexpr (IsDataTypeNumber<DataType>)
{
using NumberReturnType = ArrayAggregateResult<typename DataType::FieldType, aggregate_operation>;
result = std::make_shared<DataTypeNumber<NumberReturnType>>();
return true;
}
else if constexpr (IsDataTypeDecimal<DataType>)
{
using DecimalReturnType = ArrayAggregateResult<typename DataType::FieldType, aggregate_operation>;
UInt32 scale = getDecimalScale(*expression_return);
result = std::make_shared<DataTypeDecimal<DecimalReturnType>>(DecimalUtils::max_precision<DecimalReturnType>, scale);
return true;
}
}
else if constexpr (IsDataTypeNumber<DataType>)
else if constexpr (aggregate_operation == AggregateOperation::max || aggregate_operation == AggregateOperation::min)
{
using NumberReturnType = ArrayAggregateResult<typename DataType::FieldType, aggregate_operation>;
result = std::make_shared<DataTypeNumber<NumberReturnType>>();
if constexpr (IsDataTypeDate<DataType>)
{
result = std::make_shared<DataType>();
return true;
}
else if constexpr (IsDataTypeDecimal<DataType> && !IsDataTypeDateOrDateTime<DataType>)
{
using DecimalReturnType = ArrayAggregateResult<typename DataType::FieldType, aggregate_operation>;
UInt32 scale = getDecimalScale(*expression_return);
result = std::make_shared<DataTypeDecimal<DecimalReturnType>>(DecimalUtils::max_precision<DecimalReturnType>, scale);
return true;
}
else if constexpr (!IsDataTypeDecimal<DataType>)
{
std::string timezone = getDateTimeTimezone(*expression_return);
result = std::make_shared<DataTypeDateTime>(timezone);
return true;
return true;
}
else
{
std::string timezone = getDateTimeTimezone(*expression_return);
UInt32 scale = getDecimalScale(*expression_return);
result = std::make_shared<DataTypeDateTime64>(scale, timezone);
return true;
}
}
return false;
@ -370,7 +401,8 @@ struct ArrayAggregateImpl
executeType<Decimal32>(mapped, offsets, res) ||
executeType<Decimal64>(mapped, offsets, res) ||
executeType<Decimal128>(mapped, offsets, res) ||
executeType<Decimal256>(mapped, offsets, res))
executeType<Decimal256>(mapped, offsets, res) ||
executeType<DateTime64>(mapped, offsets, res))
{
return res;
}

View File

@ -35,10 +35,10 @@ struct ArrayDifferenceImpl
if (which.isUInt8() || which.isInt8())
return std::make_shared<DataTypeArray>(std::make_shared<DataTypeInt16>());
if (which.isUInt16() || which.isInt16())
if (which.isUInt16() || which.isInt16() || which.isDate())
return std::make_shared<DataTypeArray>(std::make_shared<DataTypeInt32>());
if (which.isUInt32() || which.isUInt64() || which.isInt32() || which.isInt64())
if (which.isUInt32() || which.isUInt64() || which.isInt32() || which.isInt64() || which.isDate32() || which.isDateTime())
return std::make_shared<DataTypeArray>(std::make_shared<DataTypeInt64>());
if (which.isFloat32() || which.isFloat64())
@ -47,6 +47,14 @@ struct ArrayDifferenceImpl
if (which.isDecimal())
return std::make_shared<DataTypeArray>(expression_return);
if (which.isDateTime64())
{
UInt32 scale = getDecimalScale(*expression_return);
UInt32 precision = getDecimalPrecision(*expression_return);
return std::make_shared<DataTypeArray>(std::make_shared<DataTypeDecimal<Decimal64>>(precision, scale));
}
throw Exception(ErrorCodes::ILLEGAL_TYPE_OF_ARGUMENT, "arrayDifference cannot process values of type {}", expression_return->getName());
}
@ -146,7 +154,8 @@ struct ArrayDifferenceImpl
executeType<Decimal32, Decimal32>(mapped, array, res) ||
executeType<Decimal64, Decimal64>(mapped, array, res) ||
executeType<Decimal128, Decimal128>(mapped, array, res) ||
executeType<Decimal256, Decimal256>(mapped, array, res))
executeType<Decimal256, Decimal256>(mapped, array, res) ||
executeType<DateTime64, Decimal64>(mapped, array, res))
return res;
else
throw Exception(ErrorCodes::ILLEGAL_COLUMN, "Unexpected column for arrayDifference: {}", mapped->getName());

View File

@ -448,6 +448,11 @@ private:
REGISTER_FUNCTION(DateDiff)
{
factory.registerFunction<FunctionDateDiff<true>>({}, FunctionFactory::CaseInsensitive);
factory.registerAlias("date_diff", FunctionDateDiff<true>::name);
factory.registerAlias("DATE_DIFF", FunctionDateDiff<true>::name);
factory.registerAlias("timestampDiff", FunctionDateDiff<true>::name);
factory.registerAlias("timestamp_diff", FunctionDateDiff<true>::name);
factory.registerAlias("TIMESTAMP_DIFF", FunctionDateDiff<true>::name);
}
REGISTER_FUNCTION(TimeDiff)

View File

@ -112,5 +112,8 @@ REGISTER_FUNCTION(Trim)
factory.registerFunction<FunctionTrimLeft>();
factory.registerFunction<FunctionTrimRight>();
factory.registerFunction<FunctionTrimBoth>();
factory.registerAlias("ltrim", FunctionTrimLeft::name);
factory.registerAlias("rtrim", FunctionTrimRight::name);
factory.registerAlias("trim", FunctionTrimBoth::name);
}
}

View File

@ -2516,8 +2516,21 @@ ActionsDAGPtr ActionsDAG::buildFilterActionsDAG(
FindOriginalNodeForOutputName::FindOriginalNodeForOutputName(const ActionsDAGPtr & actions_)
:actions(actions_)
{
for (const auto * node : actions->getOutputs())
index.emplace(node->result_name, node);
const auto & actions_outputs = actions->getOutputs();
for (const auto * output_node : actions_outputs)
{
/// find input node which refers to the output node
/// consider only aliases on the path
const auto * node = output_node;
while (node && node->type == ActionsDAG::ActionType::ALIAS)
{
/// alias has only one child
chassert(node->children.size() == 1);
node = node->children.front();
}
if (node && node->type == ActionsDAG::ActionType::INPUT)
index.emplace(output_node->result_name, node);
}
}
const ActionsDAG::Node * FindOriginalNodeForOutputName::find(const String & output_name)
@ -2526,17 +2539,36 @@ const ActionsDAG::Node * FindOriginalNodeForOutputName::find(const String & outp
if (it == index.end())
return nullptr;
/// find original(non alias) node it refers to
const ActionsDAG::Node * node = it->second;
while (node && node->type == ActionsDAG::ActionType::ALIAS)
return it->second;
}
FindAliasForInputName::FindAliasForInputName(const ActionsDAGPtr & actions_)
:actions(actions_)
{
const auto & actions_outputs = actions->getOutputs();
for (const auto * output_node : actions_outputs)
{
chassert(!node->children.empty());
node = node->children.front();
/// find input node which corresponds to alias
const auto * node = output_node;
while (node && node->type == ActionsDAG::ActionType::ALIAS)
{
/// alias has only one child
chassert(node->children.size() == 1);
node = node->children.front();
}
if (node && node->type == ActionsDAG::ActionType::INPUT)
/// node can have several aliases but we consider only the first one
index.emplace(node->result_name, output_node);
}
if (node && node->type != ActionsDAG::ActionType::INPUT)
}
const ActionsDAG::Node * FindAliasForInputName::find(const String & name)
{
const auto it = index.find(name);
if (it == index.end())
return nullptr;
return node;
return it->second;
}
}

View File

@ -410,7 +410,20 @@ class FindOriginalNodeForOutputName
public:
explicit FindOriginalNodeForOutputName(const ActionsDAGPtr & actions);
const ActionsDAG::Node* find(const String& output_name);
const ActionsDAG::Node * find(const String & output_name);
private:
ActionsDAGPtr actions;
NameToNodeIndex index;
};
class FindAliasForInputName
{
using NameToNodeIndex = std::unordered_map<std::string_view, const ActionsDAG::Node *>;
public:
explicit FindAliasForInputName(const ActionsDAGPtr & actions);
const ActionsDAG::Node * find(const String & name);
private:
ActionsDAGPtr actions;

View File

@ -919,15 +919,14 @@ void Context::setTemporaryStoragePolicy(const String & policy_name, size_t max_s
void Context::setTemporaryStorageInCache(const String & cache_disk_name, size_t max_size)
{
auto lock = getLock();
if (shared->root_temp_data_on_disk)
throw Exception(ErrorCodes::LOGICAL_ERROR, "Temporary storage is already set");
auto disk_ptr = getDisk(cache_disk_name);
if (!disk_ptr)
throw Exception(ErrorCodes::NO_ELEMENTS_IN_CONFIG, "Disk '{}' is not found", cache_disk_name);
auto lock = getLock();
if (shared->root_temp_data_on_disk)
throw Exception(ErrorCodes::LOGICAL_ERROR, "Temporary storage is already set");
const auto * disk_object_storage_ptr = dynamic_cast<const DiskObjectStorage *>(disk_ptr.get());
if (!disk_object_storage_ptr)
throw Exception(ErrorCodes::NO_ELEMENTS_IN_CONFIG, "Disk '{}' does not use cache", cache_disk_name);

View File

@ -437,7 +437,7 @@ Chunk DDLQueryStatusSource::generate()
{
auto retries_info = getRetriesInfo();
auto retries_ctl = ZooKeeperRetriesControl("executeDDLQueryOnCluster", retries_info);
auto retries_ctl = ZooKeeperRetriesControl("executeDDLQueryOnCluster", retries_info, context->getProcessListElement());
retries_ctl.retryLoop([&]()
{
auto zookeeper = context->getZooKeeper();
@ -477,7 +477,7 @@ Chunk DDLQueryStatusSource::generate()
bool finished_exists = false;
auto retries_info = getRetriesInfo();
auto retries_ctl = ZooKeeperRetriesControl("executeDDLQueryOnCluster", retries_info);
auto retries_ctl = ZooKeeperRetriesControl("executeDDLQueryOnCluster", retries_info, context->getProcessListElement());
retries_ctl.retryLoop([&]()
{
finished_exists = context->getZooKeeper()->tryGet(fs::path(node_path) / "finished" / host_id, status_data);

View File

@ -34,8 +34,11 @@
#include <QueryPipeline/QueryPipelineBuilder.h>
#include <Interpreters/Context.h>
#include <Interpreters/StorageID.h>
#include <Storages/ColumnsDescription.h>
#include <Storages/SelectQueryInfo.h>
#include <Storages/StorageDummy.h>
#include <Storages/IStorage.h>
#include <Analyzer/Utils.h>
@ -912,6 +915,46 @@ void addBuildSubqueriesForSetsStepIfNeeded(QueryPlan & query_plan,
addCreatingSetsStep(query_plan, std::move(subqueries_for_sets), planner_context->getQueryContext());
}
/// Support for `additional_result_filter` setting
void addAdditionalFilterStepIfNeeded(QueryPlan & query_plan,
const QueryNode & query_node,
const SelectQueryOptions & select_query_options,
PlannerContextPtr & planner_context
)
{
if (select_query_options.subquery_depth != 0)
return;
const auto & query_context = planner_context->getQueryContext();
const auto & settings = query_context->getSettingsRef();
auto additional_result_filter_ast = parseAdditionalResultFilter(settings);
if (!additional_result_filter_ast)
return;
ColumnsDescription fake_column_descriptions;
NameSet fake_name_set;
for (const auto & column : query_node.getProjectionColumns())
{
fake_column_descriptions.add(ColumnDescription(column.name, column.type));
fake_name_set.emplace(column.name);
}
auto storage = std::make_shared<StorageDummy>(StorageID{"dummy", "dummy"}, fake_column_descriptions);
auto fake_table_expression = std::make_shared<TableNode>(std::move(storage), query_context);
auto filter_info = buildFilterInfo(additional_result_filter_ast, fake_table_expression, planner_context, std::move(fake_name_set));
if (!filter_info.actions || !query_plan.isInitialized())
return;
auto filter_step = std::make_unique<FilterStep>(query_plan.getCurrentDataStream(),
filter_info.actions,
filter_info.column_name,
filter_info.do_remove_column);
filter_step->setStepDescription("additional result filter");
query_plan.addStep(std::move(filter_step));
}
}
PlannerContextPtr buildPlannerContext(const QueryTreeNodePtr & query_tree_node,
@ -1410,6 +1453,9 @@ void Planner::buildPlanForQueryNode()
const auto & projection_analysis_result = expression_analysis_result.getProjection();
addExpressionStep(query_plan, projection_analysis_result.project_names_actions, "Project names", result_actions_to_execute);
}
// For additional_result_filter setting
addAdditionalFilterStepIfNeeded(query_plan, query_node, select_query_options, planner_context);
}
if (!select_query_options.only_analyze)

View File

@ -33,6 +33,9 @@
#include <Analyzer/Passes/QueryAnalysisPass.h>
#include <Analyzer/QueryTreeBuilder.h>
#include <Parsers/ExpressionListParsers.h>
#include <Parsers/parseQuery.h>
#include <Processors/Sources/NullSource.h>
#include <Processors/QueryPlan/SortingStep.h>
#include <Processors/QueryPlan/CreateSetAndFilterOnTheFlyStep.h>
@ -383,46 +386,6 @@ void updatePrewhereOutputsIfNeeded(SelectQueryInfo & table_expression_query_info
prewhere_outputs.insert(prewhere_outputs.end(), required_output_nodes.begin(), required_output_nodes.end());
}
FilterDAGInfo buildFilterInfo(ASTPtr filter_expression,
SelectQueryInfo & table_expression_query_info,
PlannerContextPtr & planner_context)
{
const auto & query_context = planner_context->getQueryContext();
auto filter_query_tree = buildQueryTree(filter_expression, query_context);
QueryAnalysisPass query_analysis_pass(table_expression_query_info.table_expression);
query_analysis_pass.run(filter_query_tree, query_context);
auto & table_expression_data = planner_context->getTableExpressionDataOrThrow(table_expression_query_info.table_expression);
const auto table_expression_names = table_expression_data.getColumnNames();
NameSet table_expression_required_names_without_filter(table_expression_names.begin(), table_expression_names.end());
collectSourceColumns(filter_query_tree, planner_context);
collectSets(filter_query_tree, *planner_context);
auto filter_actions_dag = std::make_shared<ActionsDAG>();
PlannerActionsVisitor actions_visitor(planner_context, false /*use_column_identifier_as_action_node_name*/);
auto expression_nodes = actions_visitor.visit(filter_actions_dag, filter_query_tree);
if (expression_nodes.size() != 1)
throw Exception(ErrorCodes::BAD_ARGUMENTS,
"Filter actions must return single output node. Actual {}",
expression_nodes.size());
auto & filter_actions_outputs = filter_actions_dag->getOutputs();
filter_actions_outputs = std::move(expression_nodes);
std::string filter_node_name = filter_actions_outputs[0]->result_name;
bool remove_filter_column = true;
for (const auto & filter_input_node : filter_actions_dag->getInputs())
if (table_expression_required_names_without_filter.contains(filter_input_node->result_name))
filter_actions_outputs.push_back(filter_input_node);
return {std::move(filter_actions_dag), std::move(filter_node_name), remove_filter_column};
}
FilterDAGInfo buildRowPolicyFilterIfNeeded(const StoragePtr & storage,
SelectQueryInfo & table_expression_query_info,
PlannerContextPtr & planner_context)
@ -434,7 +397,7 @@ FilterDAGInfo buildRowPolicyFilterIfNeeded(const StoragePtr & storage,
if (!row_policy_filter)
return {};
return buildFilterInfo(row_policy_filter->expression, table_expression_query_info, planner_context);
return buildFilterInfo(row_policy_filter->expression, table_expression_query_info.table_expression, planner_context);
}
FilterDAGInfo buildCustomKeyFilterIfNeeded(const StoragePtr & storage,
@ -465,7 +428,48 @@ FilterDAGInfo buildCustomKeyFilterIfNeeded(const StoragePtr & storage,
*storage,
query_context);
return buildFilterInfo(parallel_replicas_custom_filter_ast, table_expression_query_info, planner_context);
return buildFilterInfo(parallel_replicas_custom_filter_ast, table_expression_query_info.table_expression, planner_context);
}
/// Apply filters from additional_table_filters setting
FilterDAGInfo buildAdditionalFiltersIfNeeded(const StoragePtr & storage,
const String & table_expression_alias,
SelectQueryInfo & table_expression_query_info,
PlannerContextPtr & planner_context)
{
const auto & query_context = planner_context->getQueryContext();
const auto & settings = query_context->getSettingsRef();
auto const & additional_filters = settings.additional_table_filters.value;
if (additional_filters.empty())
return {};
auto const & storage_id = storage->getStorageID();
ASTPtr additional_filter_ast;
for (size_t i = 0; i < additional_filters.size(); ++i)
{
const auto & tuple = additional_filters[i].safeGet<const Tuple &>();
auto const & table = tuple.at(0).safeGet<String>();
auto const & filter = tuple.at(1).safeGet<String>();
if (table == table_expression_alias ||
(table == storage_id.getTableName() && query_context->getCurrentDatabase() == storage_id.getDatabaseName()) ||
(table == storage_id.getFullNameNotQuoted()))
{
ParserExpression parser;
additional_filter_ast = parseQuery(
parser, filter.data(), filter.data() + filter.size(),
"additional filter", settings.max_query_size, settings.max_parser_depth);
break;
}
}
if (!additional_filter_ast)
return {};
table_expression_query_info.additional_filter_ast = additional_filter_ast;
return buildFilterInfo(additional_filter_ast, table_expression_query_info.table_expression, planner_context);
}
JoinTreeQueryPlan buildQueryPlanForTableExpression(QueryTreeNodePtr table_expression,
@ -696,6 +700,10 @@ JoinTreeQueryPlan buildQueryPlanForTableExpression(QueryTreeNodePtr table_expres
}
}
const auto & table_expression_alias = table_expression->getAlias();
auto additional_filters_info = buildAdditionalFiltersIfNeeded(storage, table_expression_alias, table_expression_query_info, planner_context);
add_filter(additional_filters_info, "additional filter");
from_stage = storage->getQueryProcessingStage(query_context, select_query_options.to_stage, storage_snapshot, table_expression_query_info);
storage->read(query_plan, columns_names, storage_snapshot, table_expression_query_info, query_context, from_stage, max_block_size, max_streams);

View File

@ -3,6 +3,8 @@
#include <Parsers/ASTSelectWithUnionQuery.h>
#include <Parsers/ASTSelectQuery.h>
#include <Parsers/ASTSubquery.h>
#include <Parsers/ExpressionListParsers.h>
#include <Parsers/parseQuery.h>
#include <DataTypes/DataTypesNumber.h>
#include <DataTypes/DataTypeLowCardinality.h>
@ -28,14 +30,19 @@
#include <Analyzer/TableFunctionNode.h>
#include <Analyzer/ArrayJoinNode.h>
#include <Analyzer/JoinNode.h>
#include <Analyzer/QueryTreeBuilder.h>
#include <Analyzer/Passes/QueryAnalysisPass.h>
#include <Planner/PlannerActionsVisitor.h>
#include <Planner/CollectTableExpressionData.h>
#include <Planner/CollectSets.h>
namespace DB
{
namespace ErrorCodes
{
extern const int BAD_ARGUMENTS;
extern const int LOGICAL_ERROR;
extern const int UNION_ALL_RESULT_STRUCTURES_MISMATCH;
extern const int INTERSECT_OR_EXCEPT_RESULT_STRUCTURES_MISMATCH;
@ -416,4 +423,61 @@ SelectQueryInfo buildSelectQueryInfo(const QueryTreeNodePtr & query_tree, const
return select_query_info;
}
FilterDAGInfo buildFilterInfo(ASTPtr filter_expression,
const QueryTreeNodePtr & table_expression,
PlannerContextPtr & planner_context,
NameSet table_expression_required_names_without_filter)
{
const auto & query_context = planner_context->getQueryContext();
auto filter_query_tree = buildQueryTree(filter_expression, query_context);
QueryAnalysisPass query_analysis_pass(table_expression);
query_analysis_pass.run(filter_query_tree, query_context);
if (table_expression_required_names_without_filter.empty())
{
auto & table_expression_data = planner_context->getTableExpressionDataOrThrow(table_expression);
const auto & table_expression_names = table_expression_data.getColumnNames();
table_expression_required_names_without_filter.insert(table_expression_names.begin(), table_expression_names.end());
}
collectSourceColumns(filter_query_tree, planner_context);
collectSets(filter_query_tree, *planner_context);
auto filter_actions_dag = std::make_shared<ActionsDAG>();
PlannerActionsVisitor actions_visitor(planner_context, false /*use_column_identifier_as_action_node_name*/);
auto expression_nodes = actions_visitor.visit(filter_actions_dag, filter_query_tree);
if (expression_nodes.size() != 1)
throw Exception(ErrorCodes::BAD_ARGUMENTS,
"Filter actions must return single output node. Actual {}",
expression_nodes.size());
auto & filter_actions_outputs = filter_actions_dag->getOutputs();
filter_actions_outputs = std::move(expression_nodes);
std::string filter_node_name = filter_actions_outputs[0]->result_name;
bool remove_filter_column = true;
for (const auto & filter_input_node : filter_actions_dag->getInputs())
if (table_expression_required_names_without_filter.contains(filter_input_node->result_name))
filter_actions_outputs.push_back(filter_input_node);
return {std::move(filter_actions_dag), std::move(filter_node_name), remove_filter_column};
}
ASTPtr parseAdditionalResultFilter(const Settings & settings)
{
const String & additional_result_filter = settings.additional_result_filter;
if (additional_result_filter.empty())
return {};
ParserExpression parser;
auto additional_result_filter_ast = parseQuery(
parser, additional_result_filter.data(), additional_result_filter.data() + additional_result_filter.size(),
"additional result filter", settings.max_query_size, settings.max_parser_depth);
return additional_result_filter_ast;
}
}

View File

@ -78,4 +78,12 @@ QueryTreeNodePtr buildSubqueryToReadColumnsFromTableExpression(const NamesAndTyp
SelectQueryInfo buildSelectQueryInfo(const QueryTreeNodePtr & query_tree, const PlannerContextPtr & planner_context);
/// Build filter for specific table_expression
FilterDAGInfo buildFilterInfo(ASTPtr filter_expression,
const QueryTreeNodePtr & table_expression,
PlannerContextPtr & planner_context,
NameSet table_expression_required_names_without_filter = {});
ASTPtr parseAdditionalResultFilter(const Settings & settings);
}

View File

@ -1,5 +1,7 @@
#include <IO/WriteBufferFromString.h>
#include <Common/ThreadPool.h>
#include <Common/CurrentThread.h>
#include <Common/CurrentMetrics.h>
#include <Common/setThreadName.h>
#include <Common/MemoryTracker.h>
#include <Processors/Executors/PipelineExecutor.h>
@ -19,6 +21,12 @@
#endif
namespace CurrentMetrics
{
extern const Metric QueryPipelineExecutorThreads;
extern const Metric QueryPipelineExecutorThreadsActive;
}
namespace DB
{
@ -304,26 +312,23 @@ void PipelineExecutor::initializeExecution(size_t num_threads)
tasks.init(num_threads, use_threads, profile_processors, trace_processors, read_progress_callback.get());
tasks.fill(queue);
std::unique_lock lock{threads_mutex};
threads.reserve(num_threads);
if (num_threads > 1)
pool = std::make_unique<ThreadPool>(CurrentMetrics::QueryPipelineExecutorThreads, CurrentMetrics::QueryPipelineExecutorThreadsActive, num_threads);
}
void PipelineExecutor::spawnThreads()
{
while (auto slot = slots->tryAcquire())
{
std::unique_lock lock{threads_mutex};
size_t thread_num = threads.size();
size_t thread_num = threads++;
/// Count of threads in use should be updated for proper finish() condition.
/// NOTE: this will not decrease `use_threads` below initially granted count
tasks.upscale(thread_num + 1);
/// Start new thread
threads.emplace_back([this, thread_num, thread_group = CurrentThread::getGroup(), slot = std::move(slot)]
pool->scheduleOrThrowOnError([this, thread_num, thread_group = CurrentThread::getGroup(), slot = std::move(slot)]
{
/// ThreadStatus thread_status;
SCOPE_EXIT_SAFE(
if (thread_group)
CurrentThread::detachFromGroupIfNotDetached();
@ -347,23 +352,6 @@ void PipelineExecutor::spawnThreads()
}
}
void PipelineExecutor::joinThreads()
{
for (size_t thread_num = 0; ; thread_num++)
{
std::unique_lock lock{threads_mutex};
if (thread_num >= threads.size())
break;
if (threads[thread_num].joinable())
{
auto & thread = threads[thread_num];
lock.unlock(); // to avoid deadlock if thread we are going to join starts spawning threads
thread.join();
}
}
// NOTE: No races: all concurrent spawnThreads() calls are done from `threads`, but they're already joined.
}
void PipelineExecutor::executeImpl(size_t num_threads)
{
initializeExecution(num_threads);
@ -374,7 +362,8 @@ void PipelineExecutor::executeImpl(size_t num_threads)
if (!finished_flag)
{
finish();
joinThreads();
if (pool)
pool->wait();
}
);
@ -382,7 +371,7 @@ void PipelineExecutor::executeImpl(size_t num_threads)
{
spawnThreads(); // start at least one thread
tasks.processAsyncTasks();
joinThreads();
pool->wait();
}
else
{

View File

@ -3,11 +3,12 @@
#include <Processors/IProcessor.h>
#include <Processors/Executors/ExecutorTasks.h>
#include <Common/EventCounter.h>
#include <Common/ThreadPool.h>
#include <Common/ThreadPool_fwd.h>
#include <Common/ConcurrencyControl.h>
#include <queue>
#include <mutex>
#include <memory>
namespace DB
@ -69,8 +70,8 @@ private:
// Concurrency control related
ConcurrencyControl::AllocationPtr slots;
ConcurrencyControl::SlotPtr single_thread_slot; // slot for single-thread mode to work using executeStep()
std::mutex threads_mutex;
std::vector<ThreadFromGlobalPool> threads;
std::unique_ptr<ThreadPool> pool;
std::atomic_size_t threads = 0;
/// Flag that checks that initializeExecution was called.
bool is_execution_initialized = false;
@ -94,7 +95,6 @@ private:
void initializeExecution(size_t num_threads); /// Initialize executor contexts and task_queue.
void finalizeExecution(); /// Check all processors are finished.
void spawnThreads();
void joinThreads();
/// Methods connected to execution.
void executeImpl(size_t num_threads);

View File

@ -79,27 +79,15 @@ void ExpressionStep::updateOutputStream()
if (!getDataStreamTraits().preserves_sorting)
return;
FindOriginalNodeForOutputName original_node_finder(actions_dag);
FindAliasForInputName alias_finder(actions_dag);
const auto & input_sort_description = getInputStreams().front().sort_description;
for (size_t i = 0, s = input_sort_description.size(); i < s; ++i)
{
const auto & desc = input_sort_description[i];
String alias;
const auto & origin_column = desc.column_name;
for (const auto & column : output_stream->header)
{
const auto * original_node = original_node_finder.find(column.name);
if (original_node && original_node->result_name == origin_column)
{
alias = column.name;
break;
}
}
if (alias.empty())
return;
output_stream->sort_description[i].column_name = alias;
const auto & original_column = input_sort_description[i].column_name;
const auto * alias_node = alias_finder.find(original_column);
if (alias_node)
output_stream->sort_description[i].column_name = alias_node->result_name;
}
}

View File

@ -109,27 +109,15 @@ void FilterStep::updateOutputStream()
if (!getDataStreamTraits().preserves_sorting)
return;
FindOriginalNodeForOutputName original_node_finder(actions_dag);
FindAliasForInputName alias_finder(actions_dag);
const auto & input_sort_description = getInputStreams().front().sort_description;
for (size_t i = 0, s = input_sort_description.size(); i < s; ++i)
{
const auto & desc = input_sort_description[i];
String alias;
const auto & origin_column = desc.column_name;
for (const auto & column : output_stream->header)
{
const auto * original_node = original_node_finder.find(column.name);
if (original_node && original_node->result_name == origin_column)
{
alias = column.name;
break;
}
}
if (alias.empty())
return;
output_stream->sort_description[i].column_name = alias;
const auto & original_column = input_sort_description[i].column_name;
const auto * alias_node = alias_finder.find(original_column);
if (alias_node)
output_stream->sort_description[i].column_name = alias_node->result_name;
}
}

View File

@ -236,11 +236,8 @@ void RemoteQueryExecutor::sendQuery(ClientInfo::QueryKind query_kind)
ClientInfo modified_client_info = context->getClientInfo();
modified_client_info.query_kind = query_kind;
{
std::lock_guard lock(duplicated_part_uuids_mutex);
if (!duplicated_part_uuids.empty())
connections->sendIgnoredPartUUIDs(duplicated_part_uuids);
}
if (!duplicated_part_uuids.empty())
connections->sendIgnoredPartUUIDs(duplicated_part_uuids);
connections->sendQuery(timeouts, query, query_id, stage, modified_client_info, true);
@ -471,7 +468,6 @@ bool RemoteQueryExecutor::setPartUUIDs(const std::vector<UUID> & uuids)
if (!duplicates.empty())
{
std::lock_guard lock(duplicated_part_uuids_mutex);
duplicated_part_uuids.insert(duplicated_part_uuids.begin(), duplicates.begin(), duplicates.end());
return false;
}

View File

@ -255,7 +255,6 @@ private:
std::atomic<bool> got_duplicated_part_uuids{ false };
/// Parts uuids, collected from remote replicas
std::mutex duplicated_part_uuids_mutex;
std::vector<UUID> duplicated_part_uuids;
PoolMode pool_mode = PoolMode::GET_MANY;

View File

@ -9,6 +9,7 @@
#include <Common/SettingsChanges.h>
#include <Common/setThreadName.h>
#include <Common/Stopwatch.h>
#include <Common/ThreadPool.h>
#include <DataTypes/DataTypeFactory.h>
#include <QueryPipeline/ProfileInfo.h>
#include <Interpreters/Context.h>

View File

@ -0,0 +1,55 @@
#include <Storages/MergeTree/AlterConversions.h>
#include <Common/Exception.h>
namespace DB
{
namespace ErrorCodes
{
extern const int LOGICAL_ERROR;
}
bool AlterConversions::columnHasNewName(const std::string & old_name) const
{
for (const auto & [new_name, prev_name] : rename_map)
{
if (old_name == prev_name)
return true;
}
return false;
}
std::string AlterConversions::getColumnNewName(const std::string & old_name) const
{
for (const auto & [new_name, prev_name] : rename_map)
{
if (old_name == prev_name)
return new_name;
}
throw Exception(ErrorCodes::LOGICAL_ERROR, "Column {} was not renamed", old_name);
}
bool AlterConversions::isColumnRenamed(const std::string & new_name) const
{
for (const auto & [name_to, name_from] : rename_map)
{
if (name_to == new_name)
return true;
}
return false;
}
/// Get column old name before rename (lookup by key in rename_map)
std::string AlterConversions::getColumnOldName(const std::string & new_name) const
{
for (const auto & [name_to, name_from] : rename_map)
{
if (name_to == new_name)
return name_from;
}
throw Exception(ErrorCodes::LOGICAL_ERROR, "Column {} was not renamed", new_name);
}
}

View File

@ -14,11 +14,22 @@ namespace DB
/// part->getColumns() and storage->getColumns().
struct AlterConversions
{
struct RenamePair
{
std::string rename_to;
std::string rename_from;
};
/// Rename map new_name -> old_name
std::unordered_map<std::string, std::string> rename_map;
std::vector<RenamePair> rename_map;
bool isColumnRenamed(const std::string & new_name) const { return rename_map.count(new_name) > 0; }
std::string getColumnOldName(const std::string & new_name) const { return rename_map.at(new_name); }
/// Column was renamed (lookup by value in rename_map)
bool columnHasNewName(const std::string & old_name) const;
/// Get new name for column (lookup by value in rename_map)
std::string getColumnNewName(const std::string & old_name) const;
/// Is this name is new name of column (lookup by key in rename_map)
bool isColumnRenamed(const std::string & new_name) const;
/// Get column old name before rename (lookup by key in rename_map)
std::string getColumnOldName(const std::string & new_name) const;
};
}

View File

@ -214,6 +214,11 @@ bool DataPartStorageOnDiskBase::isBroken() const
return volume->getDisk()->isBroken();
}
bool DataPartStorageOnDiskBase::isReadonly() const
{
return volume->getDisk()->isReadOnly();
}
void DataPartStorageOnDiskBase::syncRevision(UInt64 revision) const
{
volume->getDisk()->syncRevision(revision);
@ -685,6 +690,7 @@ void DataPartStorageOnDiskBase::clearDirectory(
request.emplace_back(fs::path(dir) / "default_compression_codec.txt", true);
request.emplace_back(fs::path(dir) / "delete-on-destroy.txt", true);
request.emplace_back(fs::path(dir) / "txn_version.txt", true);
request.emplace_back(fs::path(dir) / "metadata_version.txt", true);
disk->removeSharedFiles(request, !can_remove_shared_data, names_not_to_remove);
disk->removeDirectory(dir);

View File

@ -39,6 +39,7 @@ public:
bool supportZeroCopyReplication() const override;
bool supportParallelWrite() const override;
bool isBroken() const override;
bool isReadonly() const override;
void syncRevision(UInt64 revision) const override;
UInt64 getRevision() const override;
std::string getDiskPath() const override;

View File

@ -64,8 +64,9 @@ constexpr auto REPLICATION_PROTOCOL_VERSION_WITH_PARTS_DEFAULT_COMPRESSION = 4;
constexpr auto REPLICATION_PROTOCOL_VERSION_WITH_PARTS_UUID = 5;
constexpr auto REPLICATION_PROTOCOL_VERSION_WITH_PARTS_ZERO_COPY = 6;
constexpr auto REPLICATION_PROTOCOL_VERSION_WITH_PARTS_PROJECTION = 7;
constexpr auto REPLICATION_PROTOCOL_VERSION_WITH_METADATA_VERSION = 8;
// Reserved for ALTER PRIMARY KEY
// constexpr auto REPLICATION_PROTOCOL_VERSION_WITH_PARTS_PRIMARY_KEY = 8;
// constexpr auto REPLICATION_PROTOCOL_VERSION_WITH_PARTS_PRIMARY_KEY = 9;
std::string getEndpointId(const std::string & node_id)
{
@ -121,7 +122,7 @@ void Service::processQuery(const HTMLForm & params, ReadBuffer & /*body*/, Write
MergeTreePartInfo::fromPartName(part_name, data.format_version);
/// We pretend to work as older server version, to be sure that client will correctly process our version
response.addCookie({"server_protocol_version", toString(std::min(client_protocol_version, REPLICATION_PROTOCOL_VERSION_WITH_PARTS_PROJECTION))});
response.addCookie({"server_protocol_version", toString(std::min(client_protocol_version, REPLICATION_PROTOCOL_VERSION_WITH_METADATA_VERSION))});
LOG_TRACE(log, "Sending part {}", part_name);
@ -282,6 +283,10 @@ MergeTreeData::DataPart::Checksums Service::sendPartFromDisk(
&& name == IMergeTreeDataPart::DEFAULT_COMPRESSION_CODEC_FILE_NAME)
continue;
if (client_protocol_version < REPLICATION_PROTOCOL_VERSION_WITH_METADATA_VERSION
&& name == IMergeTreeDataPart::METADATA_VERSION_FILE_NAME)
continue;
files_to_replicate.insert(name);
}
@ -409,7 +414,7 @@ MergeTreeData::MutableDataPartPtr Fetcher::fetchSelectedPart(
{
{"endpoint", getEndpointId(replica_path)},
{"part", part_name},
{"client_protocol_version", toString(REPLICATION_PROTOCOL_VERSION_WITH_PARTS_PROJECTION)},
{"client_protocol_version", toString(REPLICATION_PROTOCOL_VERSION_WITH_METADATA_VERSION)},
{"compress", "false"}
});
@ -709,7 +714,7 @@ MergeTreeData::MutableDataPartPtr Fetcher::downloadPartToMemory(
auto block = block_in.read();
throttler->add(block.bytes());
new_data_part->setColumns(block.getNamesAndTypesList(), {});
new_data_part->setColumns(block.getNamesAndTypesList(), {}, metadata_snapshot->getMetadataVersion());
if (!is_projection)
{
@ -785,7 +790,8 @@ void Fetcher::downloadBaseOrProjectionPartToDisk(
if (file_name != "checksums.txt" &&
file_name != "columns.txt" &&
file_name != IMergeTreeDataPart::DEFAULT_COMPRESSION_CODEC_FILE_NAME)
file_name != IMergeTreeDataPart::DEFAULT_COMPRESSION_CODEC_FILE_NAME &&
file_name != IMergeTreeDataPart::METADATA_VERSION_FILE_NAME)
checksums.addFile(file_name, file_size, expected_hash);
}

View File

@ -150,6 +150,7 @@ public:
virtual bool supportZeroCopyReplication() const { return false; }
virtual bool supportParallelWrite() const = 0;
virtual bool isBroken() const = 0;
virtual bool isReadonly() const = 0;
/// TODO: remove or at least remove const.
virtual void syncRevision(UInt64 revision) const = 0;

View File

@ -418,10 +418,11 @@ std::pair<time_t, time_t> IMergeTreeDataPart::getMinMaxTime() const
}
void IMergeTreeDataPart::setColumns(const NamesAndTypesList & new_columns, const SerializationInfoByName & new_infos)
void IMergeTreeDataPart::setColumns(const NamesAndTypesList & new_columns, const SerializationInfoByName & new_infos, int32_t metadata_version_)
{
columns = new_columns;
serialization_infos = new_infos;
metadata_version = metadata_version_;
column_name_to_position.clear();
column_name_to_position.reserve(new_columns.size());
@ -662,6 +663,7 @@ void IMergeTreeDataPart::appendFilesOfColumnsChecksumsIndexes(Strings & files, b
appendFilesOfPartitionAndMinMaxIndex(files);
appendFilesOfTTLInfos(files);
appendFilesOfDefaultCompressionCodec(files);
appendFilesOfMetadataVersion(files);
}
if (!parent_part && include_projection)
@ -800,6 +802,9 @@ NameSet IMergeTreeDataPart::getFileNamesWithoutChecksums() const
if (getDataPartStorage().exists(TXN_VERSION_METADATA_FILE_NAME))
result.emplace(TXN_VERSION_METADATA_FILE_NAME);
if (getDataPartStorage().exists(METADATA_VERSION_FILE_NAME))
result.emplace(METADATA_VERSION_FILE_NAME);
return result;
}
@ -973,11 +978,22 @@ void IMergeTreeDataPart::removeVersionMetadata()
getDataPartStorage().removeFileIfExists("txn_version.txt");
}
void IMergeTreeDataPart::removeMetadataVersion()
{
getDataPartStorage().removeFileIfExists(METADATA_VERSION_FILE_NAME);
}
void IMergeTreeDataPart::appendFilesOfDefaultCompressionCodec(Strings & files)
{
files.push_back(DEFAULT_COMPRESSION_CODEC_FILE_NAME);
}
void IMergeTreeDataPart::appendFilesOfMetadataVersion(Strings & files)
{
files.push_back(METADATA_VERSION_FILE_NAME);
}
CompressionCodecPtr IMergeTreeDataPart::detectDefaultCompressionCodec() const
{
/// In memory parts doesn't have any compression
@ -1290,8 +1306,9 @@ void IMergeTreeDataPart::loadColumns(bool require)
metadata_snapshot = metadata_snapshot->projections.get(name).metadata;
NamesAndTypesList loaded_columns;
bool exists = metadata_manager->exists("columns.txt");
if (!exists)
bool is_readonly_storage = getDataPartStorage().isReadonly();
if (!metadata_manager->exists("columns.txt"))
{
/// We can get list of columns only from columns.txt in compact parts.
if (require || part_type == Type::Compact)
@ -1306,7 +1323,8 @@ void IMergeTreeDataPart::loadColumns(bool require)
if (columns.empty())
throw Exception(ErrorCodes::NO_FILE_IN_DATA_PART, "No columns in part {}", name);
writeColumns(loaded_columns, {});
if (!is_readonly_storage)
writeColumns(loaded_columns, {});
}
else
{
@ -1324,16 +1342,35 @@ void IMergeTreeDataPart::loadColumns(bool require)
};
SerializationInfoByName infos(loaded_columns, settings);
exists = metadata_manager->exists(SERIALIZATION_FILE_NAME);
if (exists)
if (metadata_manager->exists(SERIALIZATION_FILE_NAME))
{
auto in = metadata_manager->read(SERIALIZATION_FILE_NAME);
infos.readJSON(*in);
}
setColumns(loaded_columns, infos);
int32_t loaded_metadata_version;
if (metadata_manager->exists(METADATA_VERSION_FILE_NAME))
{
auto in = metadata_manager->read(METADATA_VERSION_FILE_NAME);
readIntText(loaded_metadata_version, *in);
}
else
{
loaded_metadata_version = metadata_snapshot->getMetadataVersion();
if (!is_readonly_storage)
{
writeMetadata(METADATA_VERSION_FILE_NAME, {}, [loaded_metadata_version](auto & buffer)
{
writeIntText(loaded_metadata_version, buffer);
});
}
}
setColumns(loaded_columns, infos, loaded_metadata_version);
}
/// Project part / part with project parts / compact part doesn't support LWD.
bool IMergeTreeDataPart::supportLightweightDeleteMutate() const
{

View File

@ -137,7 +137,11 @@ public:
String getTypeName() const { return getType().toString(); }
void setColumns(const NamesAndTypesList & new_columns, const SerializationInfoByName & new_infos);
/// We could have separate method like setMetadata, but it's much more convenient to set it up with columns
void setColumns(const NamesAndTypesList & new_columns, const SerializationInfoByName & new_infos, int32_t metadata_version_);
/// Version of metadata for part (columns, pk and so on)
int32_t getMetadataVersion() const { return metadata_version; }
const NamesAndTypesList & getColumns() const { return columns; }
const ColumnsDescription & getColumnsDescription() const { return columns_description; }
@ -312,6 +316,9 @@ public:
mutable VersionMetadata version;
/// Version of part metadata (columns, pk and so on). Managed properly only for replicated merge tree.
int32_t metadata_version;
/// For data in RAM ('index')
UInt64 getIndexSizeInBytes() const;
UInt64 getIndexSizeInAllocatedBytes() const;
@ -383,8 +390,12 @@ public:
/// (number of rows, number of rows with default values, etc).
static inline constexpr auto SERIALIZATION_FILE_NAME = "serialization.json";
/// Version used for transactions.
static inline constexpr auto TXN_VERSION_METADATA_FILE_NAME = "txn_version.txt";
static inline constexpr auto METADATA_VERSION_FILE_NAME = "metadata_version.txt";
/// One of part files which is used to check how many references (I'd like
/// to say hardlinks, but it will confuse even more) we have for the part
/// for zero copy replication. Sadly it's very complex.
@ -447,7 +458,11 @@ public:
void writeDeleteOnDestroyMarker();
void removeDeleteOnDestroyMarker();
/// It may look like a stupid joke. but these two methods are absolutely unrelated.
/// This one is about removing file with metadata about part version (for transactions)
void removeVersionMetadata();
/// This one is about removing file with version of part's metadata (columns, pk and so on)
void removeMetadataVersion();
mutable std::atomic<DataPartRemovalState> removal_state = DataPartRemovalState::NOT_ATTEMPTED;
@ -586,6 +601,8 @@ private:
static void appendFilesOfDefaultCompressionCodec(Strings & files);
static void appendFilesOfMetadataVersion(Strings & files);
/// Found column without specific compression and return codec
/// for this column with default parameters.
CompressionCodecPtr detectDefaultCompressionCodec() const;

View File

@ -246,7 +246,7 @@ bool MergeTask::ExecuteAndFinalizeHorizontalPart::prepare()
}
}
global_ctx->new_data_part->setColumns(global_ctx->storage_columns, infos);
global_ctx->new_data_part->setColumns(global_ctx->storage_columns, infos, global_ctx->metadata_snapshot->getMetadataVersion());
const auto & local_part_min_ttl = global_ctx->new_data_part->ttl_infos.part_min_ttl;
if (local_part_min_ttl && local_part_min_ttl <= global_ctx->time_of_merge)

View File

@ -4497,6 +4497,11 @@ MergeTreeData::DataPartPtr MergeTreeData::getPartIfExistsUnlocked(const MergeTre
static void loadPartAndFixMetadataImpl(MergeTreeData::MutableDataPartPtr part)
{
/// Remove metadata version file and take it from table.
/// Currently we cannot attach parts with different schema, so
/// we can assume that it's equal to table's current schema.
part->removeMetadataVersion();
part->loadColumnsChecksumsIndexes(false, true);
part->modification_time = part->getDataPartStorage().getLastModified().epochTime();
part->removeDeleteOnDestroyMarker();
@ -7744,15 +7749,23 @@ bool MergeTreeData::canUsePolymorphicParts(const MergeTreeSettings & settings, S
AlterConversions MergeTreeData::getAlterConversionsForPart(const MergeTreeDataPartPtr part) const
{
MutationCommands commands = getFirstAlterMutationCommandsForPart(part);
std::map<int64_t, MutationCommands> commands_map = getAlterMutationCommandsForPart(part);
AlterConversions result{};
for (const auto & command : commands)
/// Currently we need explicit conversions only for RENAME alter
/// all other conversions can be deduced from diff between part columns
/// and columns in storage.
if (command.type == MutationCommand::Type::RENAME_COLUMN)
result.rename_map[command.rename_to] = command.column_name;
auto & rename_map = result.rename_map;
for (const auto & [version, commands] : commands_map)
{
for (const auto & command : commands)
{
/// Currently we need explicit conversions only for RENAME alter
/// all other conversions can be deduced from diff between part columns
/// and columns in storage.
if (command.type == MutationCommand::Type::RENAME_COLUMN)
{
rename_map.emplace_back(AlterConversions::RenamePair{command.rename_to, command.column_name});
}
}
}
return result;
}
@ -8158,7 +8171,7 @@ MergeTreeData::MutableDataPartPtr MergeTreeData::createEmptyPart(
if (settings->assign_part_uuids)
new_data_part->uuid = UUIDHelpers::generateV4();
new_data_part->setColumns(columns, {});
new_data_part->setColumns(columns, {}, metadata_snapshot->getMetadataVersion());
new_data_part->rows_count = block.rows();
new_data_part->partition = partition;

View File

@ -1310,7 +1310,7 @@ protected:
/// Used to receive AlterConversions for part and apply them on fly. This
/// method has different implementations for replicated and non replicated
/// MergeTree because they store mutations in different way.
virtual MutationCommands getFirstAlterMutationCommandsForPart(const DataPartPtr & part) const = 0;
virtual std::map<int64_t, MutationCommands> getAlterMutationCommandsForPart(const DataPartPtr & part) const = 0;
/// Moves part to specified space, used in ALTER ... MOVE ... queries
MovePartsOutcome movePartsToSpace(const DataPartsVector & parts, SpacePtr space);

View File

@ -73,7 +73,7 @@ MutableDataPartStoragePtr MergeTreeDataPartInMemory::flushToDisk(const String &
new_data_part_storage->beginTransaction();
new_data_part->uuid = uuid;
new_data_part->setColumns(columns, {});
new_data_part->setColumns(columns, {}, metadata_snapshot->getMetadataVersion());
new_data_part->partition.value = partition.value;
new_data_part->minmax_idx = minmax_idx;
@ -104,7 +104,7 @@ MutableDataPartStoragePtr MergeTreeDataPartInMemory::flushToDisk(const String &
.build();
new_projection_part->is_temp = false; // clean up will be done on parent part
new_projection_part->setColumns(projection->getColumns(), {});
new_projection_part->setColumns(projection->getColumns(), {}, metadata_snapshot->getMetadataVersion());
auto new_projection_part_storage = new_projection_part->getDataPartStoragePtr();
if (new_projection_part_storage->exists())

View File

@ -464,7 +464,7 @@ MergeTreeDataWriter::TemporaryPart MergeTreeDataWriter::writeTempPartImpl(
SerializationInfoByName infos(columns, settings);
infos.add(block);
new_data_part->setColumns(columns, infos);
new_data_part->setColumns(columns, infos, metadata_snapshot->getMetadataVersion());
new_data_part->rows_count = block.rows();
new_data_part->partition = std::move(partition);
new_data_part->minmax_idx = std::move(minmax_idx);
@ -586,7 +586,7 @@ MergeTreeDataWriter::TemporaryPart MergeTreeDataWriter::writeProjectionPartImpl(
SerializationInfoByName infos(columns, settings);
infos.add(block);
new_data_part->setColumns(columns, infos);
new_data_part->setColumns(columns, infos, metadata_snapshot->getMetadataVersion());
if (new_data_part->isStoredOnDisk())
{

View File

@ -106,6 +106,15 @@ MarkCache::MappedPtr MergeTreeMarksLoader::loadMarksImpl()
// representation.
PODArray<MarkInCompressedFile> plain_marks(marks_count * columns_in_mark); // temporary
if (file_size == 0 && marks_count != 0)
{
throw Exception(
ErrorCodes::CORRUPTED_DATA,
"Empty marks file '{}': {}, must be: {}",
std::string(fs::path(data_part_storage->getFullPath()) / mrk_path),
file_size, expected_uncompressed_size);
}
if (!index_granularity_info.mark_type.compressed && expected_uncompressed_size != file_size)
throw Exception(
ErrorCodes::CORRUPTED_DATA,
@ -148,7 +157,12 @@ MarkCache::MappedPtr MergeTreeMarksLoader::loadMarksImpl()
}
if (i * mark_size != expected_uncompressed_size)
throw Exception(ErrorCodes::CANNOT_READ_ALL_DATA, "Cannot read all marks from file {}", mrk_path);
{
throw Exception(
ErrorCodes::CANNOT_READ_ALL_DATA,
"Cannot read all marks from file {}, marks expected {} (bytes size {}), marks read {} (bytes size {})",
mrk_path, marks_count, expected_uncompressed_size, i, reader->count());
}
}
auto res = std::make_shared<MarksInCompressedFile>(plain_marks);

View File

@ -230,7 +230,7 @@ MergeTreeData::MutableDataPartsVector MergeTreeWriteAheadLog::restore(
part->minmax_idx->update(block, storage.getMinMaxColumnsNames(metadata_snapshot->getPartitionKey()));
part->partition.create(metadata_snapshot, block, 0, context);
part->setColumns(block.getNamesAndTypesList(), {});
part->setColumns(block.getNamesAndTypesList(), {}, metadata_snapshot->getMetadataVersion());
if (metadata_snapshot->hasSortingKey())
metadata_snapshot->getSortingKey().expression->execute(block);

View File

@ -176,7 +176,7 @@ MergedBlockOutputStream::Finalizer MergedBlockOutputStream::finalizePartAsync(
serialization_infos.replaceData(new_serialization_infos);
files_to_remove_after_sync = removeEmptyColumnsFromPart(new_part, part_columns, serialization_infos, checksums);
new_part->setColumns(part_columns, serialization_infos);
new_part->setColumns(part_columns, serialization_infos, metadata_snapshot->getMetadataVersion());
}
auto finalizer = std::make_unique<Finalizer::Impl>(*writer, new_part, files_to_remove_after_sync, sync);
@ -290,6 +290,14 @@ MergedBlockOutputStream::WrittenFiles MergedBlockOutputStream::finalizePartOnDis
written_files.emplace_back(std::move(out));
}
{
/// Write a file with a description of columns.
auto out = new_part->getDataPartStorage().writeFile(IMergeTreeDataPart::METADATA_VERSION_FILE_NAME, 4096, write_settings);
DB::writeIntText(new_part->getMetadataVersion(), *out);
out->preFinalize();
written_files.emplace_back(std::move(out));
}
if (default_codec != nullptr)
{
auto out = new_part->getDataPartStorage().writeFile(IMergeTreeDataPart::DEFAULT_COMPRESSION_CODEC_FILE_NAME, 4096, write_settings);

View File

@ -85,7 +85,7 @@ MergedColumnOnlyOutputStream::fillChecksums(
all_checksums.files.erase(removed_file);
}
new_part->setColumns(columns, serialization_infos);
new_part->setColumns(columns, serialization_infos, metadata_snapshot->getMetadataVersion());
return checksums;
}

View File

@ -54,7 +54,7 @@ static bool checkOperationIsNotCanceled(ActionBlocker & merges_blocker, MergeLis
* First part should be executed by mutations interpreter.
* Other is just simple drop/renames, so they can be executed without interpreter.
*/
static void splitMutationCommands(
static void splitAndModifyMutationCommands(
MergeTreeData::DataPartPtr part,
const MutationCommands & commands,
MutationCommands & for_interpreter,
@ -64,7 +64,7 @@ static void splitMutationCommands(
if (!isWidePart(part) || !isFullPartStorage(part->getDataPartStorage()))
{
NameSet mutated_columns;
NameSet mutated_columns, dropped_columns;
for (const auto & command : commands)
{
if (command.type == MutationCommand::Type::MATERIALIZE_INDEX
@ -98,26 +98,63 @@ static void splitMutationCommands(
}
else
mutated_columns.emplace(command.column_name);
}
if (command.type == MutationCommand::Type::RENAME_COLUMN)
{
for_interpreter.push_back(
{
.type = MutationCommand::Type::READ_COLUMN,
.column_name = command.rename_to,
});
part_columns.rename(command.column_name, command.rename_to);
if (command.type == MutationCommand::Type::DROP_COLUMN)
dropped_columns.emplace(command.column_name);
}
}
}
auto alter_conversions = part->storage.getAlterConversionsForPart(part);
/// We don't add renames from commands, instead we take them from rename_map.
/// It's important because required renames depend not only on part's data version (i.e. mutation version)
/// but also on part's metadata version. Why we have such logic only for renames? Because all other types of alter
/// can be deduced based on difference between part's schema and table schema.
for (const auto & [rename_to, rename_from] : alter_conversions.rename_map)
{
if (part_columns.has(rename_from))
{
/// Actual rename
for_interpreter.push_back(
{
.type = MutationCommand::Type::READ_COLUMN,
.column_name = rename_to,
});
/// Not needed for compact parts (not executed), added here only to produce correct
/// set of columns for new part and their serializations
for_file_renames.push_back(
{
.type = MutationCommand::Type::RENAME_COLUMN,
.column_name = rename_from,
.rename_to = rename_to
});
part_columns.rename(rename_from, rename_to);
}
}
/// If it's compact part, then we don't need to actually remove files
/// from disk we just don't read dropped columns
for (const auto & column : part->getColumns())
for (const auto & column : part_columns)
{
if (!mutated_columns.contains(column.name))
{
for_interpreter.emplace_back(
MutationCommand{.type = MutationCommand::Type::READ_COLUMN, .column_name = column.name, .data_type = column.type});
}
else if (dropped_columns.contains(column.name))
{
/// Not needed for compact parts (not executed), added here only to produce correct
/// set of columns for new part and their serializations
for_file_renames.push_back(
{
.type = MutationCommand::Type::DROP_COLUMN,
.column_name = column.name,
});
}
}
}
else
@ -149,9 +186,21 @@ static void splitMutationCommands(
for_file_renames.push_back(command);
}
}
auto alter_conversions = part->storage.getAlterConversionsForPart(part);
/// We don't add renames from commands, instead we take them from rename_map.
/// It's important because required renames depend not only on part's data version (i.e. mutation version)
/// but also on part's metadata version. Why we have such logic only for renames? Because all other types of alter
/// can be deduced based on difference between part's schema and table schema.
for (const auto & [rename_to, rename_from] : alter_conversions.rename_map)
{
for_file_renames.push_back({.type = MutationCommand::Type::RENAME_COLUMN, .column_name = rename_from, .rename_to = rename_to});
}
}
}
/// Get the columns list of the resulting part in the same order as storage_columns.
static std::pair<NamesAndTypesList, SerializationInfoByName>
getColumnsForNewDataPart(
@ -159,8 +208,13 @@ getColumnsForNewDataPart(
const Block & updated_header,
NamesAndTypesList storage_columns,
const SerializationInfoByName & serialization_infos,
const MutationCommands & commands_for_interpreter,
const MutationCommands & commands_for_removes)
{
MutationCommands all_commands;
all_commands.insert(all_commands.end(), commands_for_interpreter.begin(), commands_for_interpreter.end());
all_commands.insert(all_commands.end(), commands_for_removes.begin(), commands_for_removes.end());
NameSet removed_columns;
NameToNameMap renamed_columns_to_from;
NameToNameMap renamed_columns_from_to;
@ -176,8 +230,7 @@ getColumnsForNewDataPart(
storage_columns.emplace_back(column);
}
/// All commands are validated in AlterCommand so we don't care about order
for (const auto & command : commands_for_removes)
for (const auto & command : all_commands)
{
if (command.type == MutationCommand::UPDATE)
{
@ -192,10 +245,14 @@ getColumnsForNewDataPart(
/// If we don't have this column in source part, than we don't need to materialize it
if (!part_columns.has(command.column_name))
{
continue;
}
if (command.type == MutationCommand::DROP_COLUMN)
{
removed_columns.insert(command.column_name);
}
if (command.type == MutationCommand::RENAME_COLUMN)
{
@ -294,20 +351,38 @@ getColumnsForNewDataPart(
/// should it's previous version should be dropped or removed
if (renamed_columns_to_from.contains(it->name) && !was_renamed && !was_removed)
throw Exception(
ErrorCodes::LOGICAL_ERROR,
"Incorrect mutation commands, trying to rename column {} to {}, "
"but part {} already has column {}",
renamed_columns_to_from[it->name], it->name, source_part->name, it->name);
ErrorCodes::LOGICAL_ERROR,
"Incorrect mutation commands, trying to rename column {} to {}, "
"but part {} already has column {}",
renamed_columns_to_from[it->name], it->name, source_part->name, it->name);
/// Column was renamed and no other column renamed to it's name
/// or column is dropped.
if (!renamed_columns_to_from.contains(it->name) && (was_renamed || was_removed))
{
it = storage_columns.erase(it);
}
else
{
/// Take a type from source part column.
/// It may differ from column type in storage.
it->type = source_col->second;
if (was_removed)
{ /// DROP COLUMN xxx, RENAME COLUMN yyy TO xxx
auto renamed_from = renamed_columns_to_from.at(it->name);
auto maybe_name_and_type = source_columns.tryGetByName(renamed_from);
if (!maybe_name_and_type)
throw Exception(
ErrorCodes::LOGICAL_ERROR,
"Got incorrect mutation commands, column {} was renamed from {}, but it doesn't exist in source columns {}",
it->name, renamed_from, source_columns.toString());
it->type = maybe_name_and_type->type;
}
else
{
/// Take a type from source part column.
/// It may differ from column type in storage.
it->type = source_col->second;
}
++it;
}
}
@ -573,6 +648,13 @@ static NameToNameVector collectFilesForRenames(
/// Collect counts for shared streams of different columns. As an example, Nested columns have shared stream with array sizes.
auto stream_counts = getStreamCounts(source_part, source_part->getColumns().getNames());
NameToNameVector rename_vector;
NameSet collected_names;
auto add_rename = [&rename_vector, &collected_names] (const std::string & file_rename_from, const std::string & file_rename_to)
{
if (collected_names.emplace(file_rename_from).second)
rename_vector.emplace_back(file_rename_from, file_rename_to);
};
/// Remove old data
for (const auto & command : commands_for_removes)
@ -581,19 +663,19 @@ static NameToNameVector collectFilesForRenames(
{
if (source_part->checksums.has(INDEX_FILE_PREFIX + command.column_name + ".idx2"))
{
rename_vector.emplace_back(INDEX_FILE_PREFIX + command.column_name + ".idx2", "");
rename_vector.emplace_back(INDEX_FILE_PREFIX + command.column_name + mrk_extension, "");
add_rename(INDEX_FILE_PREFIX + command.column_name + ".idx2", "");
add_rename(INDEX_FILE_PREFIX + command.column_name + mrk_extension, "");
}
else if (source_part->checksums.has(INDEX_FILE_PREFIX + command.column_name + ".idx"))
{
rename_vector.emplace_back(INDEX_FILE_PREFIX + command.column_name + ".idx", "");
rename_vector.emplace_back(INDEX_FILE_PREFIX + command.column_name + mrk_extension, "");
add_rename(INDEX_FILE_PREFIX + command.column_name + ".idx", "");
add_rename(INDEX_FILE_PREFIX + command.column_name + mrk_extension, "");
}
}
else if (command.type == MutationCommand::Type::DROP_PROJECTION)
{
if (source_part->checksums.has(command.column_name + ".proj"))
rename_vector.emplace_back(command.column_name + ".proj", "");
add_rename(command.column_name + ".proj", "");
}
else if (command.type == MutationCommand::Type::DROP_COLUMN)
{
@ -603,8 +685,8 @@ static NameToNameVector collectFilesForRenames(
/// Delete files if they are no longer shared with another column.
if (--stream_counts[stream_name] == 0)
{
rename_vector.emplace_back(stream_name + ".bin", "");
rename_vector.emplace_back(stream_name + mrk_extension, "");
add_rename(stream_name + ".bin", "");
add_rename(stream_name + mrk_extension, "");
}
};
@ -623,8 +705,8 @@ static NameToNameVector collectFilesForRenames(
if (stream_from != stream_to)
{
rename_vector.emplace_back(stream_from + ".bin", stream_to + ".bin");
rename_vector.emplace_back(stream_from + mrk_extension, stream_to + mrk_extension);
add_rename(stream_from + ".bin", stream_to + ".bin");
add_rename(stream_from + mrk_extension, stream_to + mrk_extension);
}
};
@ -644,8 +726,8 @@ static NameToNameVector collectFilesForRenames(
{
if (!new_streams.contains(old_stream) && --stream_counts[old_stream] == 0)
{
rename_vector.emplace_back(old_stream + ".bin", "");
rename_vector.emplace_back(old_stream + mrk_extension, "");
add_rename(old_stream + ".bin", "");
add_rename(old_stream + mrk_extension, "");
}
}
}
@ -668,6 +750,7 @@ void finalizeMutatedPart(
ExecuteTTLType execute_ttl_type,
const CompressionCodecPtr & codec,
ContextPtr context,
StorageMetadataPtr metadata_snapshot,
bool sync)
{
std::vector<std::unique_ptr<WriteBufferFromFileBase>> written_files;
@ -716,6 +799,12 @@ void finalizeMutatedPart(
written_files.push_back(std::move(out_comp));
}
{
auto out_metadata = new_data_part->getDataPartStorage().writeFile(IMergeTreeDataPart::METADATA_VERSION_FILE_NAME, 4096, context->getWriteSettings());
DB::writeText(metadata_snapshot->getMetadataVersion(), *out_metadata);
written_files.push_back(std::move(out_metadata));
}
{
/// Write a file with a description of columns.
auto out_columns = new_data_part->getDataPartStorage().writeFile("columns.txt", 4096, context->getWriteSettings());
@ -795,8 +884,6 @@ struct MutationContext
NamesAndTypesList storage_columns;
NameSet materialized_indices;
NameSet materialized_projections;
MutationsInterpreter::MutationKind::MutationKindEnum mutation_kind
= MutationsInterpreter::MutationKind::MutationKindEnum::MUTATE_UNKNOWN;
MergeTreeData::MutableDataPartPtr new_data_part;
IMergedBlockOutputStreamPtr out{nullptr};
@ -1353,13 +1440,27 @@ private:
ctx->new_data_part->storeVersionMetadata();
NameSet hardlinked_files;
/// NOTE: Renames must be done in order
for (const auto & [rename_from, rename_to] : ctx->files_to_rename)
{
if (rename_to.empty()) /// It's DROP COLUMN
{
/// pass
}
else
{
ctx->new_data_part->getDataPartStorage().createHardLinkFrom(
ctx->source_part->getDataPartStorage(), rename_from, rename_to);
hardlinked_files.insert(rename_from);
}
}
/// Create hardlinks for unchanged files
for (auto it = ctx->source_part->getDataPartStorage().iterate(); it->isValid(); it->next())
{
if (ctx->files_to_skip.contains(it->name()))
continue;
String destination;
String file_name = it->name();
auto rename_it = std::find_if(ctx->files_to_rename.begin(), ctx->files_to_rename.end(), [&file_name](const auto & rename_pair)
@ -1369,20 +1470,17 @@ private:
if (rename_it != ctx->files_to_rename.end())
{
if (rename_it->second.empty())
continue;
destination = rename_it->second;
}
else
{
destination = it->name();
/// RENAMEs and DROPs already processed
continue;
}
String destination = it->name();
if (it->isFile())
{
ctx->new_data_part->getDataPartStorage().createHardLinkFrom(
ctx->source_part->getDataPartStorage(), it->name(), destination);
hardlinked_files.insert(it->name());
ctx->source_part->getDataPartStorage(), file_name, destination);
hardlinked_files.insert(file_name);
}
else if (!endsWith(it->name(), ".tmp_proj")) // ignore projection tmp merge dir
{
@ -1478,7 +1576,7 @@ private:
}
}
MutationHelpers::finalizeMutatedPart(ctx->source_part, ctx->new_data_part, ctx->execute_ttl_type, ctx->compression_codec, ctx->context, ctx->need_sync);
MutationHelpers::finalizeMutatedPart(ctx->source_part, ctx->new_data_part, ctx->execute_ttl_type, ctx->compression_codec, ctx->context, ctx->metadata_snapshot, ctx->need_sync);
}
@ -1676,7 +1774,7 @@ bool MutateTask::prepare()
context_for_reading->setSetting("allow_asynchronous_read_from_io_pool_for_merge_tree", false);
context_for_reading->setSetting("max_streams_for_merge_tree_reading", Field(0));
MutationHelpers::splitMutationCommands(ctx->source_part, ctx->commands_for_part, ctx->for_interpreter, ctx->for_file_renames);
MutationHelpers::splitAndModifyMutationCommands(ctx->source_part, ctx->commands_for_part, ctx->for_interpreter, ctx->for_file_renames);
ctx->stage_progress = std::make_unique<MergeStageProgress>(1.0);
@ -1686,7 +1784,6 @@ bool MutateTask::prepare()
*ctx->data, ctx->source_part, ctx->metadata_snapshot, ctx->for_interpreter, context_for_reading, true);
ctx->materialized_indices = ctx->interpreter->grabMaterializedIndices();
ctx->materialized_projections = ctx->interpreter->grabMaterializedProjections();
ctx->mutation_kind = ctx->interpreter->getMutationKind();
/// Always disable filtering in mutations: we want to read and write all rows because for updates we rewrite only some of the
/// columns and preserve the columns that are not affected, but after the update all columns must have the same number of rows.
ctx->interpreter->setApplyDeletedMask(false);
@ -1696,8 +1793,6 @@ bool MutateTask::prepare()
}
auto single_disk_volume = std::make_shared<SingleDiskVolume>("volume_" + ctx->future_part->name, ctx->space_reservation->getDisk(), 0);
/// FIXME new_data_part is not used in the case when we clone part with cloneAndLoadDataPartOnSameDisk and return false
/// Is it possible to handle this case earlier?
std::string prefix;
if (ctx->need_prefix)
@ -1721,9 +1816,9 @@ bool MutateTask::prepare()
auto [new_columns, new_infos] = MutationHelpers::getColumnsForNewDataPart(
ctx->source_part, ctx->updated_header, ctx->storage_columns,
ctx->source_part->getSerializationInfos(), ctx->commands_for_part);
ctx->source_part->getSerializationInfos(), ctx->for_interpreter, ctx->for_file_renames);
ctx->new_data_part->setColumns(new_columns, new_infos);
ctx->new_data_part->setColumns(new_columns, new_infos, ctx->metadata_snapshot->getMetadataVersion());
ctx->new_data_part->partition.assign(ctx->source_part->partition);
/// Don't change granularity type while mutating subset of columns
@ -1739,7 +1834,7 @@ bool MutateTask::prepare()
/// All columns from part are changed and may be some more that were missing before in part
/// TODO We can materialize compact part without copying data
if (!isWidePart(ctx->source_part) || !isFullPartStorage(ctx->source_part->getDataPartStorage())
|| (ctx->mutation_kind == MutationsInterpreter::MutationKind::MUTATE_OTHER && ctx->interpreter && ctx->interpreter->isAffectingAllColumns()))
|| (ctx->interpreter && ctx->interpreter->isAffectingAllColumns()))
{
task = std::make_unique<MutateAllPartColumnsTask>(ctx);
}
@ -1768,39 +1863,6 @@ bool MutateTask::prepare()
ctx->for_file_renames,
ctx->mrk_extension);
if (ctx->indices_to_recalc.empty() &&
ctx->projections_to_recalc.empty() &&
ctx->mutation_kind != MutationsInterpreter::MutationKind::MUTATE_OTHER
&& ctx->files_to_rename.empty())
{
LOG_TRACE(ctx->log, "Part {} doesn't change up to mutation version {} (optimized)", ctx->source_part->name, ctx->future_part->part_info.mutation);
/// new_data_part is not used here, another part is created instead (see the comment above)
ctx->temporary_directory_lock = {};
/// In zero-copy replication checksums file path in s3 (blob path) is used for zero copy locks in ZooKeeper. If we will hardlink checksums file, we will have the same blob path
/// and two different parts (source and new mutated part) will use the same locks in ZooKeeper. To avoid this we copy checksums.txt to generate new blob path.
/// Example:
/// part: all_0_0_0/checksums.txt -> /s3/blobs/shjfgsaasdasdasdasdasdas
/// locks path in zk: /zero_copy/tbl_id/s3_blobs_shjfgsaasdasdasdasdasdas/replica_name
/// ^ part name don't participate in lock path
/// In case of full hardlink we will have:
/// part: all_0_0_0_1/checksums.txt -> /s3/blobs/shjfgsaasdasdasdasdasdas
/// locks path in zk: /zero_copy/tbl_id/s3_blobs_shjfgsaasdasdasdasdasdas/replica_name
/// So we need to copy to have a new name
NameSet files_to_copy_instead_of_hardlinks;
auto settings_ptr = ctx->data->getSettings();
bool copy_checksumns = ctx->data->supportsReplication() && settings_ptr->allow_remote_fs_zero_copy_replication && ctx->source_part->isStoredOnRemoteDiskWithZeroCopySupport();
if (copy_checksumns)
files_to_copy_instead_of_hardlinks.insert(IMergeTreeDataPart::FILE_FOR_REFERENCES_CHECK);
auto [part, lock] = ctx->data->cloneAndLoadDataPartOnSameDisk(ctx->source_part, prefix, ctx->future_part->part_info, ctx->metadata_snapshot, ctx->txn, &ctx->hardlinked_files, false, files_to_copy_instead_of_hardlinks);
part->getDataPartStorage().beginTransaction();
ctx->temporary_directory_lock = std::move(lock);
promise.set_value(std::move(part));
return false;
}
task = std::make_unique<MutateSomePartColumnsTask>(ctx);
}

View File

@ -149,7 +149,7 @@ void ReplicatedMergeTreeAttachThread::runImpl()
const bool replica_metadata_version_exists = zookeeper->tryGet(replica_path + "/metadata_version", replica_metadata_version);
if (replica_metadata_version_exists)
{
storage.metadata_version = parse<int>(replica_metadata_version);
storage.setInMemoryMetadata(metadata_snapshot->withMetadataVersion(parse<int>(replica_metadata_version)));
}
else
{

View File

@ -11,6 +11,7 @@
#include <Parsers/formatAST.h>
#include <base/sort.h>
#include <ranges>
namespace DB
{
@ -1758,19 +1759,40 @@ ReplicatedMergeTreeMergePredicate ReplicatedMergeTreeQueue::getMergePredicate(zk
}
MutationCommands ReplicatedMergeTreeQueue::getFirstAlterMutationCommandsForPart(const MergeTreeData::DataPartPtr & part) const
std::map<int64_t, MutationCommands> ReplicatedMergeTreeQueue::getAlterMutationCommandsForPart(const MergeTreeData::DataPartPtr & part) const
{
std::lock_guard lock(state_mutex);
std::unique_lock lock(state_mutex);
auto in_partition = mutations_by_partition.find(part->info.partition_id);
if (in_partition == mutations_by_partition.end())
return MutationCommands{};
return {};
Int64 part_version = part->info.getDataVersion();
for (auto [mutation_version, mutation_status] : in_partition->second)
if (mutation_version > part_version && mutation_status->entry->alter_version != -1)
return mutation_status->entry->commands;
Int64 part_metadata_version = part->getMetadataVersion();
std::map<int64_t, MutationCommands> result;
/// Here we return mutation commands for part which has bigger alter version than part metadata version.
/// Please note, we don't use getDataVersion(). It's because these alter commands are used for in-fly conversions
/// of part's metadata.
for (const auto & [mutation_version, mutation_status] : in_partition->second | std::views::reverse)
{
int32_t alter_version = mutation_status->entry->alter_version;
if (alter_version != -1)
{
if (alter_version > storage.getInMemoryMetadataPtr()->getMetadataVersion())
continue;
return MutationCommands{};
/// we take commands with bigger metadata version
if (alter_version > part_metadata_version)
{
result[mutation_version] = mutation_status->entry->commands;
}
else
{
/// entries are ordered, we processing them in reverse order so we can break
break;
}
}
}
return result;
}
MutationCommands ReplicatedMergeTreeQueue::getMutationCommands(
@ -1812,7 +1834,10 @@ MutationCommands ReplicatedMergeTreeQueue::getMutationCommands(
MutationCommands commands;
for (auto it = begin; it != end; ++it)
commands.insert(commands.end(), it->second->entry->commands.begin(), it->second->entry->commands.end());
{
const auto & commands_from_entry = it->second->entry->commands;
commands.insert(commands.end(), commands_from_entry.begin(), commands_from_entry.end());
}
return commands;
}
@ -2383,12 +2408,26 @@ std::optional<std::pair<Int64, int>> ReplicatedMergeTreeMergePredicate::getDesir
return {};
Int64 current_version = queue.getCurrentMutationVersionImpl(part->info.partition_id, part->info.getDataVersion(), lock);
Int64 max_version = in_partition->second.rbegin()->first;
Int64 max_version = in_partition->second.begin()->first;
int alter_version = -1;
bool barrier_found = false;
for (auto [mutation_version, mutation_status] : in_partition->second)
{
/// Some commands cannot stick together with other commands
if (mutation_status->entry->commands.containBarrierCommand())
{
/// We already collected some mutation, we don't want to stick it with barrier
if (max_version != mutation_version && max_version > current_version)
break;
/// This mutations is fresh, but it's barrier, let's execute only it
if (mutation_version > current_version)
barrier_found = true;
}
max_version = mutation_version;
if (mutation_status->entry->isAlterMutation())
{
/// We want to assign mutations for part which version is bigger
@ -2401,6 +2440,9 @@ std::optional<std::pair<Int64, int>> ReplicatedMergeTreeMergePredicate::getDesir
break;
}
}
if (barrier_found == true)
break;
}
if (current_version >= max_version)

View File

@ -394,10 +394,10 @@ public:
MutationCommands getMutationCommands(const MergeTreeData::DataPartPtr & part, Int64 desired_mutation_version) const;
/// Return mutation commands for part with smallest mutation version bigger
/// than data part version. Used when we apply alter commands on fly,
/// Return mutation commands for part which could be not applied to
/// it according to part mutation version. Used when we apply alter commands on fly,
/// without actual data modification on disk.
MutationCommands getFirstAlterMutationCommandsForPart(const MergeTreeData::DataPartPtr & part) const;
std::map<int64_t, MutationCommands> getAlterMutationCommandsForPart(const MergeTreeData::DataPartPtr & part) const;
/// Mark finished mutations as done. If the function needs to be called again at some later time
/// (because some mutations are probably done but we are not sure yet), returns true.

View File

@ -384,7 +384,7 @@ void ReplicatedMergeTreeSinkImpl<async_insert>::consume(Chunk chunk)
* TODO Too complex logic, you can do better.
*/
size_t replicas_num = 0;
ZooKeeperRetriesControl quorum_retries_ctl("checkQuorumPrecondition", zookeeper_retries_info);
ZooKeeperRetriesControl quorum_retries_ctl("checkQuorumPrecondition", zookeeper_retries_info, context->getProcessListElement());
quorum_retries_ctl.retryLoop(
[&]()
{
@ -641,7 +641,7 @@ std::vector<String> ReplicatedMergeTreeSinkImpl<async_insert>::commitPart(
Coordination::Error write_part_info_keeper_error = Coordination::Error::ZOK;
std::vector<String> conflict_block_ids;
ZooKeeperRetriesControl retries_ctl("commitPart", zookeeper_retries_info);
ZooKeeperRetriesControl retries_ctl("commitPart", zookeeper_retries_info, context->getProcessListElement());
retries_ctl.retryLoop([&]()
{
zookeeper->setKeeper(storage.getZooKeeper());
@ -1079,7 +1079,7 @@ std::vector<String> ReplicatedMergeTreeSinkImpl<async_insert>::commitPart(
if (isQuorumEnabled())
{
ZooKeeperRetriesControl quorum_retries_ctl("waitForQuorum", zookeeper_retries_info);
ZooKeeperRetriesControl quorum_retries_ctl("waitForQuorum", zookeeper_retries_info, context->getProcessListElement());
quorum_retries_ctl.retryLoop([&]()
{
if (storage.is_readonly)

View File

@ -1,4 +1,5 @@
#pragma once
#include <Interpreters/ProcessList.h>
#include <base/sleep.h>
#include <Common/Exception.h>
#include <Common/ZooKeeper/KeeperException.h>
@ -35,7 +36,8 @@ struct ZooKeeperRetriesInfo
class ZooKeeperRetriesControl
{
public:
ZooKeeperRetriesControl(std::string name_, ZooKeeperRetriesInfo & retries_info_) : name(std::move(name_)), retries_info(retries_info_)
ZooKeeperRetriesControl(std::string name_, ZooKeeperRetriesInfo & retries_info_, QueryStatusPtr elem)
: name(std::move(name_)), retries_info(retries_info_), process_list_element(elem)
{
}
@ -166,6 +168,9 @@ private:
if (0 == iteration_count)
return true;
if (process_list_element && !process_list_element->checkTimeLimitSoft())
return false;
if (unconditional_retry)
{
unconditional_retry = false;
@ -266,6 +271,7 @@ private:
bool unconditional_retry = false;
bool iteration_succeeded = true;
bool stop_retries = false;
QueryStatusPtr process_list_element;
};
}

View File

@ -70,8 +70,8 @@ CREATE TABLE [IF NOT EXISTS] [db.]table_name [ON CLUSTER cluster]
name1 [type1] [DEFAULT|MATERIALIZED|ALIAS expr1] [TTL expr1],
name2 [type2] [DEFAULT|MATERIALIZED|ALIAS expr2] [TTL expr2],
...
INDEX index_name1 expr1 TYPE type1(...) GRANULARITY value1,
INDEX index_name2 expr2 TYPE type2(...) GRANULARITY value2
INDEX index_name1 expr1 TYPE type1(...) [GRANULARITY value1],
INDEX index_name2 expr2 TYPE type2(...) [GRANULARITY value2]
) ENGINE = MergeTree()
ORDER BY expr
[PARTITION BY expr]

View File

@ -23,6 +23,12 @@ namespace ErrorCodes
extern const int MULTIPLE_ASSIGNMENTS_TO_COLUMN;
}
bool MutationCommand::isBarrierCommand() const
{
return type == RENAME_COLUMN;
}
std::optional<MutationCommand> MutationCommand::parse(ASTAlterCommand * command, bool parse_alter_commands)
{
if (command->type == ASTAlterCommand::DELETE)
@ -212,4 +218,14 @@ bool MutationCommands::hasNonEmptyMutationCommands() const
return false;
}
bool MutationCommands::containBarrierCommand() const
{
for (const auto & command : *this)
{
if (command.isBarrierCommand())
return true;
}
return false;
}
}

View File

@ -67,6 +67,9 @@ struct MutationCommand
/// If parse_alter_commands, than consider more Alter commands as mutation commands
static std::optional<MutationCommand> parse(ASTAlterCommand * command, bool parse_alter_commands = false);
/// This command shouldn't stick with other commands
bool isBarrierCommand() const;
};
/// Multiple mutation commands, possible from different ALTER queries
@ -79,6 +82,11 @@ public:
void readText(ReadBuffer & in);
std::string toString() const;
bool hasNonEmptyMutationCommands() const;
/// These set of commands contain barrier command and shouldn't
/// stick with other commands. Commands from one set have already been validated
/// to be executed without issues on the creation state.
bool containBarrierCommand() const;
};
using MutationCommandsConstPtr = std::shared_ptr<MutationCommands>;

View File

@ -41,6 +41,7 @@ StorageInMemoryMetadata::StorageInMemoryMetadata(const StorageInMemoryMetadata &
, settings_changes(other.settings_changes ? other.settings_changes->clone() : nullptr)
, select(other.select)
, comment(other.comment)
, metadata_version(other.metadata_version)
{
}
@ -69,6 +70,7 @@ StorageInMemoryMetadata & StorageInMemoryMetadata::operator=(const StorageInMemo
settings_changes.reset();
select = other.select;
comment = other.comment;
metadata_version = other.metadata_version;
return *this;
}
@ -122,6 +124,18 @@ void StorageInMemoryMetadata::setSelectQuery(const SelectQueryDescription & sele
select = select_;
}
void StorageInMemoryMetadata::setMetadataVersion(int32_t metadata_version_)
{
metadata_version = metadata_version_;
}
StorageInMemoryMetadata StorageInMemoryMetadata::withMetadataVersion(int32_t metadata_version_) const
{
StorageInMemoryMetadata copy(*this);
copy.setMetadataVersion(metadata_version_);
return copy;
}
const ColumnsDescription & StorageInMemoryMetadata::getColumns() const
{
return columns;

View File

@ -50,6 +50,10 @@ struct StorageInMemoryMetadata
String comment;
/// Version of metadata. Managed properly by ReplicatedMergeTree only
/// (zero-initialization is important)
int32_t metadata_version = 0;
StorageInMemoryMetadata() = default;
StorageInMemoryMetadata(const StorageInMemoryMetadata & other);
@ -58,7 +62,7 @@ struct StorageInMemoryMetadata
StorageInMemoryMetadata(StorageInMemoryMetadata && other) = default;
StorageInMemoryMetadata & operator=(StorageInMemoryMetadata && other) = default;
/// NOTE: Thread unsafe part. You should modify same StorageInMemoryMetadata
/// NOTE: Thread unsafe part. You should not modify same StorageInMemoryMetadata
/// structure from different threads. It should be used as MultiVersion
/// object. See example in IStorage.
@ -90,6 +94,11 @@ struct StorageInMemoryMetadata
/// Set SELECT query for (Materialized)View
void setSelectQuery(const SelectQueryDescription & select_);
/// Set version of metadata.
void setMetadataVersion(int32_t metadata_version_);
/// Get copy of current metadata with metadata_version_
StorageInMemoryMetadata withMetadataVersion(int32_t metadata_version_) const;
/// Returns combined set of columns
const ColumnsDescription & getColumns() const;
@ -218,6 +227,9 @@ struct StorageInMemoryMetadata
const SelectQueryDescription & getSelectQuery() const;
bool hasSelectQuery() const;
/// Get version of metadata
int32_t getMetadataVersion() const { return metadata_version; }
/// Check that all the requested names are in the table and have the correct types.
void check(const NamesAndTypesList & columns) const;

View File

@ -326,6 +326,24 @@ void StorageMergeTree::alter(
}
else
{
if (!maybe_mutation_commands.empty() && maybe_mutation_commands.containBarrierCommand())
{
int64_t prev_mutation = 0;
{
std::lock_guard lock(currently_processing_in_background_mutex);
auto it = current_mutations_by_version.rbegin();
if (it != current_mutations_by_version.rend())
prev_mutation = it->first;
}
if (prev_mutation != 0)
{
LOG_DEBUG(log, "Cannot change metadata with barrier alter query, will wait for mutation {}", prev_mutation);
waitForMutation(prev_mutation);
LOG_DEBUG(log, "Mutation {} finished", prev_mutation);
}
}
{
changeSettings(new_metadata.settings_changes, table_lock_holder);
checkTTLExpressions(new_metadata, old_metadata);
@ -1150,9 +1168,24 @@ MergeMutateSelectedEntryPtr StorageMergeTree::selectPartsToMutate(
if (current_ast_elements + commands_size >= max_ast_elements)
break;
current_ast_elements += commands_size;
commands->insert(commands->end(), it->second.commands.begin(), it->second.commands.end());
last_mutation_to_apply = it;
const auto & single_mutation_commands = it->second.commands;
if (single_mutation_commands.containBarrierCommand())
{
if (commands->empty())
{
commands->insert(commands->end(), single_mutation_commands.begin(), single_mutation_commands.end());
last_mutation_to_apply = it;
}
break;
}
else
{
current_ast_elements += commands_size;
commands->insert(commands->end(), single_mutation_commands.begin(), single_mutation_commands.end());
last_mutation_to_apply = it;
}
}
assert(commands->empty() == (last_mutation_to_apply == mutations_end_it));
@ -1247,7 +1280,10 @@ bool StorageMergeTree::scheduleDataProcessingJob(BackgroundJobsAssignee & assign
}
if (mutate_entry)
{
auto task = std::make_shared<MutatePlainMergeTreeTask>(*this, metadata_snapshot, mutate_entry, shared_lock, common_assignee_trigger);
/// We take new metadata snapshot here. It's because mutation commands can be executed only with metadata snapshot
/// which is equal or more fresh than commands themselves. In extremely rare case it can happen that we will have alter
/// in between we took snapshot above and selected commands. That is why we take new snapshot here.
auto task = std::make_shared<MutatePlainMergeTreeTask>(*this, getInMemoryMetadataPtr(), mutate_entry, shared_lock, common_assignee_trigger);
assignee.scheduleMergeMutateTask(task);
return true;
}
@ -2116,14 +2152,22 @@ void StorageMergeTree::attachRestoredParts(MutableDataPartsVector && parts)
}
MutationCommands StorageMergeTree::getFirstAlterMutationCommandsForPart(const DataPartPtr & part) const
std::map<int64_t, MutationCommands> StorageMergeTree::getAlterMutationCommandsForPart(const DataPartPtr & part) const
{
std::lock_guard lock(currently_processing_in_background_mutex);
auto it = current_mutations_by_version.upper_bound(part->info.getDataVersion());
if (it == current_mutations_by_version.end())
return {};
return it->second.commands;
Int64 part_data_version = part->info.getDataVersion();
std::map<int64_t, MutationCommands> result;
if (!current_mutations_by_version.empty())
{
const auto & [latest_mutation_id, latest_commands] = *current_mutations_by_version.rbegin();
if (part_data_version < static_cast<int64_t>(latest_mutation_id))
{
result[latest_mutation_id] = latest_commands.commands;
}
}
return result;
}
void StorageMergeTree::startBackgroundMovesIfNeeded()

View File

@ -267,7 +267,7 @@ private:
protected:
MutationCommands getFirstAlterMutationCommandsForPart(const DataPartPtr & part) const override;
std::map<int64_t, MutationCommands> getAlterMutationCommandsForPart(const DataPartPtr & part) const override;
};
}

View File

@ -462,7 +462,7 @@ StorageReplicatedMergeTree::StorageReplicatedMergeTree(
Coordination::Stat metadata_stat;
current_zookeeper->get(zookeeper_path + "/metadata", &metadata_stat);
metadata_version = metadata_stat.version;
setInMemoryMetadata(metadata_snapshot->withMetadataVersion(metadata_stat.version));
}
catch (Coordination::Exception & e)
{
@ -784,7 +784,7 @@ bool StorageReplicatedMergeTree::createTableIfNotExists(const StorageMetadataPtr
zkutil::CreateMode::Persistent));
ops.emplace_back(zkutil::makeCreateRequest(replica_path + "/columns", metadata_snapshot->getColumns().toString(),
zkutil::CreateMode::Persistent));
ops.emplace_back(zkutil::makeCreateRequest(replica_path + "/metadata_version", std::to_string(metadata_version),
ops.emplace_back(zkutil::makeCreateRequest(replica_path + "/metadata_version", toString(metadata_snapshot->getMetadataVersion()),
zkutil::CreateMode::Persistent));
/// The following 3 nodes were added in version 1.1.xxx, so we create them here, not in createNewZooKeeperNodes()
@ -857,7 +857,7 @@ void StorageReplicatedMergeTree::createReplica(const StorageMetadataPtr & metada
zkutil::CreateMode::Persistent));
ops.emplace_back(zkutil::makeCreateRequest(replica_path + "/columns", metadata_snapshot->getColumns().toString(),
zkutil::CreateMode::Persistent));
ops.emplace_back(zkutil::makeCreateRequest(replica_path + "/metadata_version", std::to_string(metadata_version),
ops.emplace_back(zkutil::makeCreateRequest(replica_path + "/metadata_version", toString(metadata_snapshot->getMetadataVersion()),
zkutil::CreateMode::Persistent));
/// The following 3 nodes were added in version 1.1.xxx, so we create them here, not in createNewZooKeeperNodes()
@ -1162,16 +1162,19 @@ void StorageReplicatedMergeTree::checkTableStructure(const String & zookeeper_pr
}
void StorageReplicatedMergeTree::setTableStructure(const StorageID & table_id, const ContextPtr & local_context,
ColumnsDescription new_columns, const ReplicatedMergeTreeTableMetadata::Diff & metadata_diff)
ColumnsDescription new_columns, const ReplicatedMergeTreeTableMetadata::Diff & metadata_diff, int32_t new_metadata_version)
{
StorageInMemoryMetadata old_metadata = getInMemoryMetadata();
StorageInMemoryMetadata new_metadata = metadata_diff.getNewMetadata(new_columns, local_context, old_metadata);
new_metadata.setMetadataVersion(new_metadata_version);
/// Even if the primary/sorting/partition keys didn't change we must reinitialize it
/// because primary/partition key column types might have changed.
checkTTLExpressions(new_metadata, old_metadata);
setProperties(new_metadata, old_metadata);
DatabaseCatalog::instance().getDatabase(table_id.database_name)->alterTable(local_context, table_id, new_metadata);
}
@ -2793,8 +2796,9 @@ void StorageReplicatedMergeTree::cloneMetadataIfNeeded(const String & source_rep
return;
}
auto metadata_snapshot = getInMemoryMetadataPtr();
Int32 source_metadata_version = parse<Int32>(source_metadata_version_str);
if (metadata_version == source_metadata_version)
if (metadata_snapshot->getMetadataVersion() == source_metadata_version)
return;
/// Our metadata it not up to date with source replica metadata.
@ -2812,7 +2816,7 @@ void StorageReplicatedMergeTree::cloneMetadataIfNeeded(const String & source_rep
/// if all such entries were cleaned up from the log and source_queue.
LOG_WARNING(log, "Metadata version ({}) on replica is not up to date with metadata ({}) on source replica {}",
metadata_version, source_metadata_version, source_replica);
metadata_snapshot->getMetadataVersion(), source_metadata_version, source_replica);
String source_metadata;
String source_columns;
@ -4987,14 +4991,15 @@ bool StorageReplicatedMergeTree::optimize(
bool StorageReplicatedMergeTree::executeMetadataAlter(const StorageReplicatedMergeTree::LogEntry & entry)
{
if (entry.alter_version < metadata_version)
auto current_metadata = getInMemoryMetadataPtr();
if (entry.alter_version < current_metadata->getMetadataVersion())
{
/// TODO Can we replace it with LOGICAL_ERROR?
/// As for now, it may rarely happen due to reordering of ALTER_METADATA entries in the queue of
/// non-initial replica and also may happen after stale replica recovery.
LOG_WARNING(log, "Attempt to update metadata of version {} "
"to older version {} when processing log entry {}: {}",
metadata_version, entry.alter_version, entry.znode_name, entry.toString());
current_metadata->getMetadataVersion(), entry.alter_version, entry.znode_name, entry.toString());
return true;
}
@ -5042,10 +5047,10 @@ bool StorageReplicatedMergeTree::executeMetadataAlter(const StorageReplicatedMer
LOG_INFO(log, "Metadata changed in ZooKeeper. Applying changes locally.");
auto metadata_diff = ReplicatedMergeTreeTableMetadata(*this, getInMemoryMetadataPtr()).checkAndFindDiff(metadata_from_entry, getInMemoryMetadataPtr()->getColumns(), getContext());
setTableStructure(table_id, alter_context, std::move(columns_from_entry), metadata_diff);
metadata_version = entry.alter_version;
setTableStructure(table_id, alter_context, std::move(columns_from_entry), metadata_diff, entry.alter_version);
LOG_INFO(log, "Applied changes to the metadata of the table. Current metadata version: {}", metadata_version);
current_metadata = getInMemoryMetadataPtr();
LOG_INFO(log, "Applied changes to the metadata of the table. Current metadata version: {}", current_metadata->getMetadataVersion());
}
{
@ -5057,7 +5062,7 @@ bool StorageReplicatedMergeTree::executeMetadataAlter(const StorageReplicatedMer
/// This transaction may not happen, but it's OK, because on the next retry we will eventually create/update this node
/// TODO Maybe do in in one transaction for Replicated database?
zookeeper->createOrUpdate(fs::path(replica_path) / "metadata_version", std::to_string(metadata_version), zkutil::CreateMode::Persistent);
zookeeper->createOrUpdate(fs::path(replica_path) / "metadata_version", std::to_string(current_metadata->getMetadataVersion()), zkutil::CreateMode::Persistent);
return true;
}
@ -5181,7 +5186,7 @@ void StorageReplicatedMergeTree::alter(
size_t mutation_path_idx = std::numeric_limits<size_t>::max();
String new_metadata_str = future_metadata_in_zk.toString();
ops.emplace_back(zkutil::makeSetRequest(fs::path(zookeeper_path) / "metadata", new_metadata_str, metadata_version));
ops.emplace_back(zkutil::makeSetRequest(fs::path(zookeeper_path) / "metadata", new_metadata_str, current_metadata->getMetadataVersion()));
String new_columns_str = future_metadata.columns.toString();
ops.emplace_back(zkutil::makeSetRequest(fs::path(zookeeper_path) / "columns", new_columns_str, -1));
@ -5197,7 +5202,7 @@ void StorageReplicatedMergeTree::alter(
/// We can be sure, that in case of successful commit in zookeeper our
/// version will increments by 1. Because we update with version check.
int new_metadata_version = metadata_version + 1;
int new_metadata_version = current_metadata->getMetadataVersion() + 1;
alter_entry->type = LogEntry::ALTER_METADATA;
alter_entry->source_replica = replica_name;
@ -7989,9 +7994,9 @@ bool StorageReplicatedMergeTree::canUseAdaptiveGranularity() const
}
MutationCommands StorageReplicatedMergeTree::getFirstAlterMutationCommandsForPart(const DataPartPtr & part) const
std::map<int64_t, MutationCommands> StorageReplicatedMergeTree::getAlterMutationCommandsForPart(const DataPartPtr & part) const
{
return queue.getFirstAlterMutationCommandsForPart(part);
return queue.getAlterMutationCommandsForPart(part);
}

View File

@ -216,8 +216,6 @@ public:
/// It's used if not set in engine's arguments while creating a replicated table.
static String getDefaultReplicaName(const ContextPtr & context_);
int getMetadataVersion() const { return metadata_version; }
/// Modify a CREATE TABLE query to make a variant which must be written to a backup.
void adjustCreateQueryForBackup(ASTPtr & create_query) const override;
@ -430,7 +428,6 @@ private:
std::atomic<bool> shutdown_called {false};
std::atomic<bool> flush_called {false};
int metadata_version = 0;
/// Threads.
/// A task that keeps track of the updates in the logs of all replicas and loads them into the queue.
@ -517,8 +514,10 @@ private:
/// A part of ALTER: apply metadata changes only (data parts are altered separately).
/// Must be called under IStorage::lockForAlter() lock.
void setTableStructure(const StorageID & table_id, const ContextPtr & local_context,
ColumnsDescription new_columns, const ReplicatedMergeTreeTableMetadata::Diff & metadata_diff);
void setTableStructure(
const StorageID & table_id, const ContextPtr & local_context,
ColumnsDescription new_columns, const ReplicatedMergeTreeTableMetadata::Diff & metadata_diff,
int32_t new_metadata_version);
/** Check that the set of parts corresponds to that in ZK (/replicas/me/parts/).
* If any parts described in ZK are not locally, throw an exception.
@ -842,7 +841,7 @@ private:
void waitMutationToFinishOnReplicas(
const Strings & replicas, const String & mutation_id) const;
MutationCommands getFirstAlterMutationCommandsForPart(const DataPartPtr & part) const override;
std::map<int64_t, MutationCommands> getAlterMutationCommandsForPart(const DataPartPtr & part) const override;
void startBackgroundMovesIfNeeded() override;

View File

@ -124,6 +124,8 @@ TRUSTED_CONTRIBUTORS = {
"tylerhannan", # ClickHouse Employee
"myrrc", # Mike Kot, DoubleCloud
"thevar1able", # ClickHouse Employee
"aalexfvk",
"MikhailBurdukov",
]
}

View File

@ -370,7 +370,9 @@ class ClickHouseCluster:
self.docker_logs_path = p.join(self.instances_dir, "docker.log")
self.env_file = p.join(self.instances_dir, DEFAULT_ENV_NAME)
self.env_variables = {}
self.env_variables["TSAN_OPTIONS"] = "second_deadlock_stack=1"
# Problems with glibc 2.36+ [1]
#
# [1]: https://github.com/ClickHouse/ClickHouse/issues/43426#issuecomment-1368512678
self.env_variables["ASAN_OPTIONS"] = "use_sigaltstack=0"
self.env_variables["CLICKHOUSE_WATCHDOG_ENABLE"] = "0"
self.env_variables["CLICKHOUSE_NATS_TLS_SECURE"] = "0"

View File

@ -154,3 +154,40 @@ def test_dependency_via_dictionary_database(node):
node.query(f"DROP DICTIONARY IF EXISTS {d_name} SYNC")
node.query("DROP DATABASE dict_db SYNC")
node.restart_clickhouse()
@pytest.mark.parametrize("node", nodes)
def test_dependent_dict_table_distr(node):
query = node.query
query("CREATE DATABASE test_db;")
query(
"CREATE TABLE test_db.test(id UInt32,data UInt32,key1 UInt8,key2 UInt8) ENGINE=MergeTree ORDER BY id;"
)
query(
"INSERT INTO test_db.test SELECT abs(rand32())%100, rand32()%1000, abs(rand32())%1, abs(rand32())%1 FROM numbers(100);"
)
query(
"CREATE TABLE test_db.dictback (key1 UInt8,key2 UInt8, value UInt8) ENGINE=MergeTree ORDER BY key1;"
)
query("INSERT INTO test_db.dictback VALUES (0,0,0);")
query(
"CREATE DICTIONARY test_db.mdict (key1 UInt8,key2 UInt8, value UInt8) PRIMARY KEY key1,key2"
" SOURCE(CLICKHOUSE(HOST 'localhost' PORT tcpPort() DB 'test_db' TABLE 'dictback'))"
" LIFETIME(MIN 100 MAX 100) LAYOUT(COMPLEX_KEY_CACHE(SIZE_IN_CELLS 1000));"
)
query(
"CREATE TABLE test_db.distr (id UInt32, data UInt32, key1 UInt8, key2 UInt8)"
" ENGINE = Distributed('test_shard_localhost', test_db, test, dictGetOrDefault('test_db.mdict','value',(key1,key2),0));"
)
# Tables should load in the correct order.
node.restart_clickhouse()
query("DETACH TABLE test_db.distr;")
query("ATTACH TABLE test_db.distr;")
node.restart_clickhouse()
query("DROP DATABASE IF EXISTS test_db;")

View File

@ -21,23 +21,31 @@ def cluster():
cluster.add_instance(
"node3", main_configs=["configs/storage_conf_web.xml"], with_nginx=True
)
cluster.add_instance(
"node4",
main_configs=["configs/storage_conf.xml"],
with_nginx=True,
stay_alive=True,
with_installed_binary=True,
image="clickhouse/clickhouse-server",
tag="22.8.14.53",
)
cluster.start()
node1 = cluster.instances["node1"]
expected = ""
global uuids
for i in range(3):
node1.query(
def create_table_and_upload_data(node, i):
node.query(
f"CREATE TABLE data{i} (id Int32) ENGINE = MergeTree() ORDER BY id SETTINGS storage_policy = 'def', min_bytes_for_wide_part=1;"
)
for _ in range(10):
node1.query(
node.query(
f"INSERT INTO data{i} SELECT number FROM numbers(500000 * {i+1})"
)
expected = node1.query(f"SELECT * FROM data{i} ORDER BY id")
node.query(f"SELECT * FROM data{i} ORDER BY id")
metadata_path = node1.query(
metadata_path = node.query(
f"SELECT data_paths FROM system.tables WHERE name='data{i}'"
)
metadata_path = metadata_path[
@ -45,7 +53,7 @@ def cluster():
]
print(f"Metadata: {metadata_path}")
node1.exec_in_container(
node.exec_in_container(
[
"bash",
"-c",
@ -56,8 +64,20 @@ def cluster():
user="root",
)
parts = metadata_path.split("/")
uuids.append(parts[3])
print(f"UUID: {parts[3]}")
return parts[3]
node1 = cluster.instances["node1"]
global uuids
for i in range(2):
uuid = create_table_and_upload_data(node1, i)
uuids.append(uuid)
node4 = cluster.instances["node4"]
uuid = create_table_and_upload_data(node4, 2)
uuids.append(uuid)
yield cluster
@ -68,6 +88,7 @@ def cluster():
@pytest.mark.parametrize("node_name", ["node2"])
def test_usage(cluster, node_name):
node1 = cluster.instances["node1"]
node4 = cluster.instances["node4"]
node2 = cluster.instances[node_name]
global uuids
assert len(uuids) == 3
@ -90,7 +111,11 @@ def test_usage(cluster, node_name):
result = node2.query(
"SELECT id FROM test{} WHERE id % 56 = 3 ORDER BY id".format(i)
)
assert result == node1.query(
node = node1
if i == 2:
node = node4
assert result == node.query(
"SELECT id FROM data{} WHERE id % 56 = 3 ORDER BY id".format(i)
)
@ -99,7 +124,7 @@ def test_usage(cluster, node_name):
i
)
)
assert result == node1.query(
assert result == node.query(
"SELECT id FROM data{} WHERE id > 789999 AND id < 999999 ORDER BY id".format(
i
)
@ -141,6 +166,7 @@ def test_incorrect_usage(cluster):
@pytest.mark.parametrize("node_name", ["node2"])
def test_cache(cluster, node_name):
node1 = cluster.instances["node1"]
node4 = cluster.instances["node4"]
node2 = cluster.instances[node_name]
global uuids
assert len(uuids) == 3
@ -178,7 +204,12 @@ def test_cache(cluster, node_name):
result = node2.query(
"SELECT id FROM test{} WHERE id % 56 = 3 ORDER BY id".format(i)
)
assert result == node1.query(
node = node1
if i == 2:
node = node4
assert result == node.query(
"SELECT id FROM data{} WHERE id % 56 = 3 ORDER BY id".format(i)
)
@ -187,7 +218,7 @@ def test_cache(cluster, node_name):
i
)
)
assert result == node1.query(
assert result == node.query(
"SELECT id FROM data{} WHERE id > 789999 AND id < 999999 ORDER BY id".format(
i
)

View File

@ -41,8 +41,9 @@ node = cluster.add_instance(
"node",
main_configs=["configs/grpc_config.xml"],
# Bug in TSAN reproduces in this test https://github.com/grpc/grpc/issues/29550#issuecomment-1188085387
# second_deadlock_stack -- just ordinary option we use everywhere, don't want to overwrite it
env_variables={"TSAN_OPTIONS": "report_atomic_races=0 second_deadlock_stack=1"},
env_variables={
"TSAN_OPTIONS": "report_atomic_races=0 " + os.getenv("TSAN_OPTIONS")
},
)
main_channel = None

View File

@ -43,8 +43,9 @@ node = cluster.add_instance(
"configs/ca-cert.pem",
],
# Bug in TSAN reproduces in this test https://github.com/grpc/grpc/issues/29550#issuecomment-1188085387
# second_deadlock_stack -- just ordinary option we use everywhere, don't want to overwrite it
env_variables={"TSAN_OPTIONS": "report_atomic_races=0 second_deadlock_stack=1"},
env_variables={
"TSAN_OPTIONS": "report_atomic_races=0 " + os.getenv("TSAN_OPTIONS")
},
)

View File

@ -98,3 +98,23 @@ def test_replica_inserts_with_keeper_disconnect(started_cluster):
finally:
node1.query("DROP TABLE IF EXISTS r SYNC")
def test_query_timeout_with_zk_down(started_cluster):
try:
node1.query(
"CREATE TABLE zk_down (a UInt64, b String) ENGINE=ReplicatedMergeTree('/test/zk_down', '0') ORDER BY tuple()"
)
cluster.stop_zookeeper_nodes(["zoo1", "zoo2", "zoo3"])
start_time = time.time()
with pytest.raises(QueryRuntimeException):
node1.query(
"INSERT INTO zk_down SELECT number, toString(number) FROM numbers(10) SETTINGS insert_keeper_max_retries=10000, insert_keeper_retry_max_backoff_ms=1000, max_execution_time=1"
)
finish_time = time.time()
assert finish_time - start_time < 10
finally:
cluster.start_zookeeper_nodes(["zoo1", "zoo2", "zoo3"])
node1.query("DROP TABLE IF EXISTS zk_down SYNC")

View File

@ -167,6 +167,7 @@ def test_session_close_shutdown(started_cluster):
eph_node = "/test_node"
node2_zk.create(eph_node, ephemeral=True)
node1_zk.sync(eph_node)
assert node1_zk.exists(eph_node) != None
# shutdown while session is active

View File

@ -43,8 +43,18 @@ def create_table(cluster, table_name, additional_settings=None):
FILES_OVERHEAD = 1
FILES_OVERHEAD_PER_COLUMN = 2 # Data and mark files
FILES_OVERHEAD_PER_PART_WIDE = FILES_OVERHEAD_PER_COLUMN * 3 + 2 + 6 + 1
FILES_OVERHEAD_PER_PART_COMPACT = 10 + 1
FILES_OVERHEAD_DEFAULT_COMPRESSION_CODEC = 1
FILES_OVERHEAD_METADATA_VERSION = 1
FILES_OVERHEAD_PER_PART_WIDE = (
FILES_OVERHEAD_PER_COLUMN * 3
+ 2
+ 6
+ FILES_OVERHEAD_DEFAULT_COMPRESSION_CODEC
+ FILES_OVERHEAD_METADATA_VERSION
)
FILES_OVERHEAD_PER_PART_COMPACT = (
10 + FILES_OVERHEAD_DEFAULT_COMPRESSION_CODEC + FILES_OVERHEAD_METADATA_VERSION
)
@pytest.fixture(scope="module")

View File

@ -52,8 +52,18 @@ def cluster():
FILES_OVERHEAD = 1
FILES_OVERHEAD_PER_COLUMN = 2 # Data and mark files
FILES_OVERHEAD_PER_PART_WIDE = FILES_OVERHEAD_PER_COLUMN * 3 + 2 + 6 + 1
FILES_OVERHEAD_PER_PART_COMPACT = 10 + 1
FILES_OVERHEAD_DEFAULT_COMPRESSION_CODEC = 1
FILES_OVERHEAD_METADATA_VERSION = 1
FILES_OVERHEAD_PER_PART_WIDE = (
FILES_OVERHEAD_PER_COLUMN * 3
+ 2
+ 6
+ FILES_OVERHEAD_DEFAULT_COMPRESSION_CODEC
+ FILES_OVERHEAD_METADATA_VERSION
)
FILES_OVERHEAD_PER_PART_COMPACT = (
10 + FILES_OVERHEAD_DEFAULT_COMPRESSION_CODEC + FILES_OVERHEAD_METADATA_VERSION
)
def create_table(node, table_name, **additional_settings):
@ -232,7 +242,6 @@ def test_insert_same_partition_and_merge(cluster, merge_vertical, node_name):
def test_alter_table_columns(cluster, node_name):
node = cluster.instances[node_name]
create_table(node, "s3_test")
minio = cluster.minio_client
node.query(
"INSERT INTO s3_test VALUES {}".format(generate_values("2020-01-03", 4096))

View File

@ -89,7 +89,7 @@ def drop_table(cluster):
# S3 request will be failed for an appropriate part file write.
FILES_PER_PART_BASE = 5 # partition.dat, default_compression_codec.txt, count.txt, columns.txt, checksums.txt
FILES_PER_PART_BASE = 6 # partition.dat, metadata_version.txt, default_compression_codec.txt, count.txt, columns.txt, checksums.txt
FILES_PER_PART_WIDE = (
FILES_PER_PART_BASE + 1 + 1 + 3 * 2
) # Primary index, MinMax, Mark and data file for column(s)

View File

@ -105,6 +105,8 @@ def partition_complex_assert_checksums():
"c4ca4238a0b923820dcc509a6f75849b\tshadow/1/data/test/partition_complex/19700102_2_2_0/count.txt\n"
"c4ca4238a0b923820dcc509a6f75849b\tshadow/1/data/test/partition_complex/19700201_1_1_0/count.txt\n"
"cfcb770c3ecd0990dcceb1bde129e6c6\tshadow/1/data/test/partition_complex/19700102_2_2_0/p.bin\n"
"cfcd208495d565ef66e7dff9f98764da\tshadow/1/data/test/partition_complex/19700102_2_2_0/metadata_version.txt\n"
"cfcd208495d565ef66e7dff9f98764da\tshadow/1/data/test/partition_complex/19700201_1_1_0/metadata_version.txt\n"
"e2af3bef1fd129aea73a890ede1e7a30\tshadow/1/data/test/partition_complex/19700201_1_1_0/k.bin\n"
"f2312862cc01adf34a93151377be2ddf\tshadow/1/data/test/partition_complex/19700201_1_1_0/minmax_p.idx\n"
)

View File

@ -44,8 +44,18 @@ def cluster():
FILES_OVERHEAD = 1
FILES_OVERHEAD_PER_COLUMN = 2 # Data and mark files
FILES_OVERHEAD_PER_PART_WIDE = FILES_OVERHEAD_PER_COLUMN * 3 + 2 + 6 + 1
FILES_OVERHEAD_PER_PART_COMPACT = 10 + 1
FILES_OVERHEAD_DEFAULT_COMPRESSION_CODEC = 1
FILES_OVERHEAD_METADATA_VERSION = 1
FILES_OVERHEAD_PER_PART_WIDE = (
FILES_OVERHEAD_PER_COLUMN * 3
+ 2
+ 6
+ FILES_OVERHEAD_DEFAULT_COMPRESSION_CODEC
+ FILES_OVERHEAD_METADATA_VERSION
)
FILES_OVERHEAD_PER_PART_COMPACT = (
10 + FILES_OVERHEAD_DEFAULT_COMPRESSION_CODEC + FILES_OVERHEAD_METADATA_VERSION
)
def random_string(length):

View File

@ -47,8 +47,18 @@ def cluster():
FILES_OVERHEAD = 1
FILES_OVERHEAD_PER_COLUMN = 2 # Data and mark files
FILES_OVERHEAD_PER_PART_WIDE = FILES_OVERHEAD_PER_COLUMN * 3 + 2 + 6 + 1
FILES_OVERHEAD_PER_PART_COMPACT = 10 + 1
FILES_OVERHEAD_DEFAULT_COMPRESSION_CODEC = 1
FILES_OVERHEAD_METADATA_VERSION = 1
FILES_OVERHEAD_PER_PART_WIDE = (
FILES_OVERHEAD_PER_COLUMN * 3
+ 2
+ 6
+ FILES_OVERHEAD_DEFAULT_COMPRESSION_CODEC
+ FILES_OVERHEAD_METADATA_VERSION
)
FILES_OVERHEAD_PER_PART_COMPACT = (
10 + FILES_OVERHEAD_DEFAULT_COMPRESSION_CODEC + FILES_OVERHEAD_METADATA_VERSION
)
def random_string(length):

View File

@ -86,9 +86,9 @@ def test_ttl_move_and_s3(started_cluster):
print(f"Total objects: {counter}")
if counter == 300:
if counter == 330:
break
print(f"Attempts remaining: {attempt}")
assert counter == 300
assert counter == 330

View File

@ -12,6 +12,7 @@ import pymysql.connections
import pymysql.err
import pytest
import sys
import os
import time
import logging
from helpers.cluster import ClickHouseCluster, run_and_check
@ -34,8 +35,9 @@ instance = cluster.add_instance(
user_configs=["configs/default_passwd.xml"],
with_zookeeper=True,
# Bug in TSAN reproduces in this test https://github.com/grpc/grpc/issues/29550#issuecomment-1188085387
# second_deadlock_stack -- just ordinary option we use everywhere, don't want to overwrite it
env_variables={"TSAN_OPTIONS": "report_atomic_races=0 second_deadlock_stack=1"},
env_variables={
"TSAN_OPTIONS": "report_atomic_races=0 " + os.getenv("TSAN_OPTIONS")
},
)

View File

@ -0,0 +1,120 @@
---- toDecimal32 ----
54.1234
1.1111
---- toDecimal64 ----
54.1234
1.1111
---- toDecimal128 ----
54.1234
1.1111
---- toDecimal256 ----
54.1234
1.1111
---- toDecimal32OrDefault ----
54.1234
1.1111
0
0
0
0
0
0
---- toDecimal32OrNull ----
54.1234
1.1111
\N
\N
\N
\N
\N
\N
---- toDecimal32OrZero ----
54.1234
1.1111
0
0
0
0
0
0
---- toDecimal64OrDefault ----
54.1234
1.1111
0
0
0
0
0
0
---- toDecimal64OrZero ----
54.1234
1.1111
0
0
0
0
0
0
---- toDecimal64OrNull ----
54.1234
1.1111
\N
\N
\N
\N
\N
\N
---- toDecimal128OrDefault ----
54.1234
1.1111
0
0
0
0
0
0
---- toDecimal128OrNull ----
54.1234
1.1111
\N
\N
\N
\N
\N
\N
---- toDecimal128OrZero ----
54.1234
1.1111
0
0
0
0
0
0
---- toDecimal256OrDefault ----
54.1234
1.1111
0
0
0
0
0
0
---- toDecimal256OrNull ----
54.1234
1.1111
\N
\N
\N
\N
\N
\N
---- toDecimal256OrZero ----
54.1234
1.1111
0
0
0
0
0
0

View File

@ -0,0 +1,32 @@
{% for func in [ "toDecimal32", "toDecimal64", "toDecimal128", "toDecimal256" ] -%}
SELECT '---- {{ func }} ----';
SELECT {{ func }} ('54.1234567', 4);
SELECT {{ func }} ('1.1111111111111111111111111111111111111', 4);
SELECT {{ func }} ('x123', 4); -- { serverError CANNOT_PARSE_TEXT }
SELECT {{ func }} ('', 4); -- { serverError ATTEMPT_TO_READ_AFTER_EOF }
SELECT {{ func }} ('\0', 4); -- { serverError CANNOT_PARSE_TEXT }
SELECT {{ func }} ('\0\0\0\0\0', 4); -- { serverError CANNOT_PARSE_TEXT }
SELECT {{ func }} ('\n\t\r', 4); -- { serverError CANNOT_PARSE_TEXT }
SELECT {{ func }} ('\'', 4); -- { serverError CANNOT_PARSE_TEXT }
{% endfor -%}
{% for func in [ "toDecimal32OrDefault", "toDecimal32OrNull", "toDecimal32OrZero",
"toDecimal64OrDefault", "toDecimal64OrZero", "toDecimal64OrNull",
"toDecimal128OrDefault", "toDecimal128OrNull", "toDecimal128OrZero",
"toDecimal256OrDefault", "toDecimal256OrNull", "toDecimal256OrZero" ] -%}
SELECT '---- {{ func }} ----';
SELECT {{ func }} ('54.1234567', 4);
SELECT {{ func }} ('1.1111111111111111111111111111111111111', 4);
SELECT {{ func }} ('x123', 4);
SELECT {{ func }} ('', 4);
SELECT {{ func }} ('\0', 4);
SELECT {{ func }} ('\0\0\0\0\0', 4);
SELECT {{ func }} ('\n\t\r', 4);
SELECT {{ func }} ('\'', 4);
{% endfor -%}

View File

@ -1,7 +1,7 @@
CREATE TABLE default.rename_table\n(\n `key` Int32,\n `old_value1` Int32,\n `value1` Int32\n)\nENGINE = MergeTree\nORDER BY tuple()\nSETTINGS index_granularity = 8192
CREATE TABLE default.rename_table\n(\n `key` Int32,\n `old_value1` Int32,\n `value1` Int32\n)\nENGINE = MergeTree\nORDER BY tuple()\nSETTINGS min_bytes_for_wide_part = 0, index_granularity = 8192
key old_value1 value1
1 2 3
CREATE TABLE default.rename_table\n(\n `k` Int32,\n `v1` Int32,\n `v2` Int32\n)\nENGINE = MergeTree\nORDER BY tuple()\nSETTINGS index_granularity = 8192
CREATE TABLE default.rename_table\n(\n `k` Int32,\n `v1` Int32,\n `v2` Int32\n)\nENGINE = MergeTree\nORDER BY tuple()\nSETTINGS min_bytes_for_wide_part = 0, index_granularity = 8192
k v1 v2
1 2 3
4 5 6

View File

@ -1,6 +1,6 @@
DROP TABLE IF EXISTS rename_table;
CREATE TABLE rename_table (key Int32, value1 Int32, value2 Int32) ENGINE = MergeTree ORDER BY tuple();
CREATE TABLE rename_table (key Int32, value1 Int32, value2 Int32) ENGINE = MergeTree ORDER BY tuple() SETTINGS min_bytes_for_wide_part=0;
INSERT INTO rename_table VALUES (1, 2, 3);

View File

@ -1,11 +1,11 @@
CREATE TABLE default.rename_table_multiple\n(\n `key` Int32,\n `value1_string` String,\n `value2` Int32\n)\nENGINE = MergeTree\nORDER BY tuple()\nSETTINGS index_granularity = 8192
CREATE TABLE default.rename_table_multiple\n(\n `key` Int32,\n `value1_string` String,\n `value2` Int32\n)\nENGINE = MergeTree\nORDER BY tuple()\nSETTINGS min_bytes_for_wide_part = 0, index_granularity = 8192
key value1_string value2
1 2 3
CREATE TABLE default.rename_table_multiple\n(\n `key` Int32,\n `value1_string` String,\n `value2_old` Int32,\n `value2` Int64 DEFAULT 7\n)\nENGINE = MergeTree\nORDER BY tuple()\nSETTINGS index_granularity = 8192
CREATE TABLE default.rename_table_multiple\n(\n `key` Int32,\n `value1_string` String,\n `value2_old` Int32,\n `value2` Int64 DEFAULT 7\n)\nENGINE = MergeTree\nORDER BY tuple()\nSETTINGS min_bytes_for_wide_part = 0, index_granularity = 8192
key value1_string value2_old value2
1 2 3 7
4 5 6 7
CREATE TABLE default.rename_table_multiple\n(\n `key` Int32,\n `value1_string` String,\n `value2_old` Int64 DEFAULT 7\n)\nENGINE = MergeTree\nORDER BY tuple()\nSETTINGS index_granularity = 8192
CREATE TABLE default.rename_table_multiple\n(\n `key` Int32,\n `value1_string` String,\n `value2_old` Int64 DEFAULT 7\n)\nENGINE = MergeTree\nORDER BY tuple()\nSETTINGS min_bytes_for_wide_part = 0, index_granularity = 8192
key value1_string value2_old
1 2 7
4 5 7

View File

@ -1,6 +1,6 @@
DROP TABLE IF EXISTS rename_table_multiple;
CREATE TABLE rename_table_multiple (key Int32, value1 String, value2 Int32) ENGINE = MergeTree ORDER BY tuple();
CREATE TABLE rename_table_multiple (key Int32, value1 String, value2 Int32) ENGINE = MergeTree ORDER BY tuple() SETTINGS min_bytes_for_wide_part=0;
INSERT INTO rename_table_multiple VALUES (1, 2, 3);

View File

@ -34,6 +34,10 @@ Table array decimal avg
3.5
0
2
2023-04-05 00:25:24 2023-04-05 00:25:23 [0,1]
2023-04-05 00:25:24.124 2023-04-05 00:25:23.123 [0,1.001]
2023-04-06 2023-04-05 [0,1]
2023-04-06 2023-04-05 [0,1]
Types of aggregation result array min
Int8 Int16 Int32 Int64
UInt8 UInt16 UInt32 UInt64

View File

@ -34,6 +34,11 @@ SELECT arrayAvg(x) FROM test_aggregation;
DROP TABLE test_aggregation;
WITH ['2023-04-05 00:25:23', '2023-04-05 00:25:24']::Array(DateTime) AS dt SELECT arrayMax(dt), arrayMin(dt), arrayDifference(dt);
WITH ['2023-04-05 00:25:23.123', '2023-04-05 00:25:24.124']::Array(DateTime64(3)) AS dt SELECT arrayMax(dt), arrayMin(dt), arrayDifference(dt);
WITH ['2023-04-05', '2023-04-06']::Array(Date) AS d SELECT arrayMax(d), arrayMin(d), arrayDifference(d);
WITH ['2023-04-05', '2023-04-06']::Array(Date32) AS d SELECT arrayMax(d), arrayMin(d), arrayDifference(d);
SELECT 'Types of aggregation result array min';
SELECT toTypeName(arrayMin([toInt8(0)])), toTypeName(arrayMin([toInt16(0)])), toTypeName(arrayMin([toInt32(0)])), toTypeName(arrayMin([toInt64(0)]));
SELECT toTypeName(arrayMin([toUInt8(0)])), toTypeName(arrayMin([toUInt16(0)])), toTypeName(arrayMin([toUInt32(0)])), toTypeName(arrayMin([toUInt64(0)]));

View File

@ -7,25 +7,25 @@ file_segment_range_begin: 0
file_segment_range_end: 745
size: 746
state: DOWNLOADED
7
7
8
8
0
2
2
7
8
Row 1:
──────
file_segment_range_begin: 0
file_segment_range_end: 1659
size: 1660
state: DOWNLOADED
7
7
7
7
21
31
38
8
8
8
8
24
35
43
5010500
18816
Using storage policy: local_cache
@ -37,24 +37,24 @@ file_segment_range_begin: 0
file_segment_range_end: 745
size: 746
state: DOWNLOADED
7
7
8
8
0
2
2
7
8
Row 1:
──────
file_segment_range_begin: 0
file_segment_range_end: 1659
size: 1660
state: DOWNLOADED
7
7
7
7
21
31
38
8
8
8
8
24
35
43
5010500
18816

View File

@ -44,8 +44,8 @@ for i in {1..100}; do
")"
# Non retriable errors
if [[ $FileSync -ne 7 ]]; then
echo "FileSync: $FileSync != 11" >&2
if [[ $FileSync -ne 8 ]]; then
echo "FileSync: $FileSync != 8" >&2
exit 2
fi
# Check that all files was synced

View File

@ -0,0 +1,8 @@
1 2 3
4 5 6
{"column1_renamed":"1","column2_renamed":"2","column3":"3"}
{"column1_renamed":"4","column2_renamed":"5","column3":"6"}
1 2 3
4 5 6
{"column1_renamed":"1","column2_renamed":"2","column3":"3"}
{"column1_renamed":"4","column2_renamed":"5","column3":"6"}

View File

@ -0,0 +1,59 @@
DROP TABLE IF EXISTS wrong_metadata;
CREATE TABLE wrong_metadata(
column1 UInt64,
column2 UInt64,
column3 UInt64
)
ENGINE ReplicatedMergeTree('/test/{database}/tables/wrong_metadata', '1')
ORDER BY tuple();
INSERT INTO wrong_metadata VALUES (1, 2, 3);
SYSTEM STOP REPLICATION QUEUES wrong_metadata;
ALTER TABLE wrong_metadata RENAME COLUMN column1 TO column1_renamed SETTINGS replication_alter_partitions_sync = 0;
INSERT INTO wrong_metadata VALUES (4, 5, 6);
SELECT * FROM wrong_metadata ORDER BY column1;
SYSTEM START REPLICATION QUEUES wrong_metadata;
SYSTEM SYNC REPLICA wrong_metadata;
ALTER TABLE wrong_metadata RENAME COLUMN column2 to column2_renamed SETTINGS replication_alter_partitions_sync = 2;
SELECT * FROM wrong_metadata ORDER BY column1_renamed FORMAT JSONEachRow;
DROP TABLE IF EXISTS wrong_metadata;
CREATE TABLE wrong_metadata_wide(
column1 UInt64,
column2 UInt64,
column3 UInt64
)
ENGINE ReplicatedMergeTree('/test/{database}/tables/wrong_metadata_wide', '1')
ORDER BY tuple()
SETTINGS min_bytes_for_wide_part = 0;
INSERT INTO wrong_metadata_wide VALUES (1, 2, 3);
SYSTEM STOP REPLICATION QUEUES wrong_metadata_wide;
ALTER TABLE wrong_metadata_wide RENAME COLUMN column1 TO column1_renamed SETTINGS replication_alter_partitions_sync = 0;
INSERT INTO wrong_metadata_wide VALUES (4, 5, 6);
SELECT * FROM wrong_metadata_wide ORDER by column1;
SYSTEM START REPLICATION QUEUES wrong_metadata_wide;
SYSTEM SYNC REPLICA wrong_metadata_wide;
ALTER TABLE wrong_metadata_wide RENAME COLUMN column2 to column2_renamed SETTINGS replication_alter_partitions_sync = 2;
SELECT * FROM wrong_metadata_wide ORDER BY column1_renamed FORMAT JSONEachRow;
DROP TABLE IF EXISTS wrong_metadata_wide;

View File

@ -0,0 +1 @@
{"v":"1","v2":"77"}

View File

@ -0,0 +1,58 @@
#!/usr/bin/env bash
CURDIR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)
# shellcheck source=../shell_config.sh
. "$CURDIR"/../shell_config.sh
$CLICKHOUSE_CLIENT --query="DROP TABLE IF EXISTS table_to_rename"
$CLICKHOUSE_CLIENT --query="CREATE TABLE table_to_rename(v UInt64, v1 UInt64)ENGINE = MergeTree ORDER BY tuple() SETTINGS min_bytes_for_wide_part = 0"
$CLICKHOUSE_CLIENT --query="INSERT INTO table_to_rename VALUES (1, 1)"
# we want to following mutations to stuck
# That is why we stop merges and wait in loops until they actually start
$CLICKHOUSE_CLIENT --query="SYSTEM STOP MERGES table_to_rename"
$CLICKHOUSE_CLIENT --query="ALTER TABLE table_to_rename RENAME COLUMN v1 to v2" &
counter=0 retries=60
I=0
while [[ $counter -lt $retries ]]; do
I=$((I + 1))
result=$($CLICKHOUSE_CLIENT --query "show create table table_to_rename")
if [[ $result == *"v2"* ]]; then
break;
fi
sleep 0.1
((++counter))
done
$CLICKHOUSE_CLIENT --query="ALTER TABLE table_to_rename UPDATE v2 = 77 WHERE 1 = 1 SETTINGS mutations_sync = 2" &
counter=0 retries=60
I=0
while [[ $counter -lt $retries ]]; do
I=$((I + 1))
result=$($CLICKHOUSE_CLIENT --query "SELECT count() from system.mutations where database='${CLICKHOUSE_DATABASE}' and table='table_to_rename'")
if [[ $result == "2" ]]; then
break;
fi
sleep 0.1
((++counter))
done
$CLICKHOUSE_CLIENT --query="SYSTEM START MERGES table_to_rename"
wait
$CLICKHOUSE_CLIENT --query="SELECT * FROM table_to_rename FORMAT JSONEachRow"
$CLICKHOUSE_CLIENT --query="DROP TABLE IF EXISTS table_to_rename"

View File

@ -0,0 +1 @@
{"v":"1","v2":"77"}

View File

@ -0,0 +1,48 @@
#!/usr/bin/env bash
CURDIR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)
# shellcheck source=../shell_config.sh
. "$CURDIR"/../shell_config.sh
$CLICKHOUSE_CLIENT --query="DROP TABLE IF EXISTS table_to_rename"
$CLICKHOUSE_CLIENT --query="CREATE TABLE table_to_rename(v UInt64, v1 UInt64)ENGINE = MergeTree ORDER BY tuple() SETTINGS min_bytes_for_wide_part = 0"
$CLICKHOUSE_CLIENT --query="INSERT INTO table_to_rename VALUES (1, 1)"
# we want to following mutations to stuck
# That is why we stop merges and wait in loops until they actually start
$CLICKHOUSE_CLIENT --query="SYSTEM STOP MERGES table_to_rename"
$CLICKHOUSE_CLIENT --query="ALTER TABLE table_to_rename UPDATE v1 = 77 WHERE 1 = 1 SETTINGS mutations_sync = 2" &
counter=0 retries=60
I=0
while [[ $counter -lt $retries ]]; do
I=$((I + 1))
result=$($CLICKHOUSE_CLIENT --query "SELECT count() from system.mutations where database='${CLICKHOUSE_DATABASE}' and table='table_to_rename'")
if [[ $result == "1" ]]; then
break;
fi
sleep 0.1
((++counter))
done
$CLICKHOUSE_CLIENT --query="ALTER TABLE table_to_rename RENAME COLUMN v1 to v2" &
# it will not introduce any flakyness
# just wait that mutation doesn't start
sleep 3
$CLICKHOUSE_CLIENT --query="SYSTEM START MERGES table_to_rename"
wait
$CLICKHOUSE_CLIENT --query="SELECT * FROM table_to_rename FORMAT JSONEachRow"
$CLICKHOUSE_CLIENT --query="DROP TABLE IF EXISTS table_to_rename"

View File

@ -0,0 +1,26 @@
{"a1":"1","b1":"2","c":"3"}
~~~~~~~
{"a1":"1","b1":"2","c":"3"}
{"a1":"4","b1":"5","c":"6"}
~~~~~~~
{"a1":"1","b1":"2","c":"3"}
{"a1":"4","b1":"5","c":"6"}
{"a1":"7","b1":"8","c":"9"}
~~~~~~~
{"b":"1","a":"2","c":"3"}
{"b":"4","a":"5","c":"6"}
{"b":"7","a":"8","c":"9"}
~~~~~~~
{"a1":"1","b1":"2","c":"3"}
~~~~~~~
{"a1":"1","b1":"2","c":"3"}
{"a1":"4","b1":"5","c":"6"}
~~~~~~~
{"a1":"1","b1":"2","c":"3"}
{"a1":"4","b1":"5","c":"6"}
{"a1":"7","b1":"8","c":"9"}
~~~~~~~
{"b":"1","a":"2","c":"3"}
{"b":"4","a":"5","c":"6"}
{"b":"7","a":"8","c":"9"}
~~~~~~~

Some files were not shown because too many files have changed in this diff Show More