Merge branch 'master' into export-logs-in-ci

This commit is contained in:
Alexey Milovidov 2023-08-11 22:54:28 +02:00
commit 0317ec135d
17 changed files with 206 additions and 49 deletions

View File

@ -169,7 +169,6 @@ host = '127.0.0.1',
port = 3306,
database = 'test',
connection_pool_size = 8,
on_duplicate_clause = 1,
replace_query = 1
```
@ -185,7 +184,6 @@ replace_query = 1
<port>3306</port>
<database>test</database>
<connection_pool_size>8</connection_pool_size>
<on_duplicate_clause>1</on_duplicate_clause>
<replace_query>1</replace_query>
</mymysql>
</named_collections>

View File

@ -88,7 +88,6 @@ SELECT * FROM s3_engine_table LIMIT 3;
<port>3306</port>
<database>test</database>
<connection_pool_size>8</connection_pool_size>
<on_duplicate_clause>1</on_duplicate_clause>
<replace_query>1</replace_query>
</mymysql>
</named_collections>

View File

@ -755,6 +755,9 @@ class FunctionBinaryArithmetic : public IFunction
static constexpr bool is_multiply = IsOperation<Op>::multiply;
static constexpr bool is_division = IsOperation<Op>::division;
static constexpr bool is_bit_hamming_distance = IsOperation<Op>::bit_hamming_distance;
static constexpr bool is_modulo = IsOperation<Op>::modulo;
static constexpr bool is_div_int = IsOperation<Op>::div_int;
static constexpr bool is_div_int_or_zero = IsOperation<Op>::div_int_or_zero;
ContextPtr context;
bool check_decimal_overflow = true;
@ -964,13 +967,28 @@ class FunctionBinaryArithmetic : public IFunction
"argument of numeric type cannot be first", name);
std::string function_name;
if (is_multiply)
if constexpr (is_multiply)
{
function_name = "tupleMultiplyByNumber";
}
else
else // is_division
{
function_name = "tupleDivideByNumber";
if constexpr (is_modulo)
{
function_name = "tupleModuloByNumber";
}
else if constexpr (is_div_int)
{
function_name = "tupleIntDivByNumber";
}
else if constexpr (is_div_int_or_zero)
{
function_name = "tupleIntDivOrZeroByNumber";
}
else
{
function_name = "tupleDivideByNumber";
}
}
return FunctionFactory::instance().get(function_name, context);

View File

@ -60,7 +60,7 @@ struct IsOperation
static constexpr bool bit_hamming_distance = IsSameOperation<Op, BitHammingDistanceImpl>::value;
static constexpr bool division = div_floating || div_int || div_int_or_zero;
static constexpr bool division = div_floating || div_int || div_int_or_zero || modulo;
static constexpr bool allow_decimal = plus || minus || multiply || division || least || greatest;
};

View File

@ -23,6 +23,9 @@ struct PlusName { static constexpr auto name = "plus"; };
struct MinusName { static constexpr auto name = "minus"; };
struct MultiplyName { static constexpr auto name = "multiply"; };
struct DivideName { static constexpr auto name = "divide"; };
struct ModuloName { static constexpr auto name = "modulo"; };
struct IntDivName { static constexpr auto name = "intDiv"; };
struct IntDivOrZeroName { static constexpr auto name = "intDivOrZero"; };
struct L1Label { static constexpr auto name = "1"; };
struct L2Label { static constexpr auto name = "2"; };
@ -141,6 +144,12 @@ using FunctionTupleMultiply = FunctionTupleOperator<MultiplyName>;
using FunctionTupleDivide = FunctionTupleOperator<DivideName>;
using FunctionTupleModulo = FunctionTupleOperator<ModuloName>;
using FunctionTupleIntDiv = FunctionTupleOperator<IntDivName>;
using FunctionTupleIntDivOrZero = FunctionTupleOperator<IntDivOrZeroName>;
class FunctionTupleNegate : public ITupleFunction
{
public:
@ -297,6 +306,12 @@ using FunctionTupleMultiplyByNumber = FunctionTupleOperatorByNumber<MultiplyName
using FunctionTupleDivideByNumber = FunctionTupleOperatorByNumber<DivideName>;
using FunctionTupleModuloByNumber = FunctionTupleOperatorByNumber<ModuloName>;
using FunctionTupleIntDivByNumber = FunctionTupleOperatorByNumber<IntDivName>;
using FunctionTupleIntDivOrZeroByNumber = FunctionTupleOperatorByNumber<IntDivOrZeroName>;
class FunctionDotProduct : public ITupleFunction
{
public:
@ -1563,6 +1578,9 @@ REGISTER_FUNCTION(VectorFunctions)
factory.registerAlias("vectorDifference", FunctionTupleMinus::name, FunctionFactory::CaseInsensitive);
factory.registerFunction<FunctionTupleMultiply>();
factory.registerFunction<FunctionTupleDivide>();
factory.registerFunction<FunctionTupleModulo>();
factory.registerFunction<FunctionTupleIntDiv>();
factory.registerFunction<FunctionTupleIntDivOrZero>();
factory.registerFunction<FunctionTupleNegate>();
factory.registerFunction<FunctionAddTupleOfIntervals>(FunctionDocumentation
@ -1626,6 +1644,9 @@ If the types of the first interval (or the interval in the tuple) and the second
factory.registerFunction<FunctionTupleMultiplyByNumber>();
factory.registerFunction<FunctionTupleDivideByNumber>();
factory.registerFunction<FunctionTupleModuloByNumber>();
factory.registerFunction<FunctionTupleIntDivByNumber>();
factory.registerFunction<FunctionTupleIntDivOrZeroByNumber>();
factory.registerFunction<TupleOrArrayFunctionDotProduct>();
factory.registerAlias("scalarProduct", TupleOrArrayFunctionDotProduct::name, FunctionFactory::CaseInsensitive);

View File

@ -113,13 +113,14 @@ QueryTreeNodePtr prepareQueryAffectedQueryTree(const std::vector<MutationCommand
ColumnDependencies getAllColumnDependencies(
const StorageMetadataPtr & metadata_snapshot,
const NameSet & updated_columns,
const std::function<bool(const String & file_name)> & has_index_or_projection)
const StorageInMemoryMetadata::HasDependencyCallback & has_dependency)
{
NameSet new_updated_columns = updated_columns;
ColumnDependencies dependencies;
while (!new_updated_columns.empty())
{
auto new_dependencies = metadata_snapshot->getColumnDependencies(new_updated_columns, true, has_index_or_projection);
auto new_dependencies = metadata_snapshot->getColumnDependencies(new_updated_columns, true, has_dependency);
new_updated_columns.clear();
for (const auto & dependency : new_dependencies)
{
@ -292,9 +293,14 @@ bool MutationsInterpreter::Source::materializeTTLRecalculateOnly() const
return data && data->getSettings()->materialize_ttl_recalculate_only;
}
bool MutationsInterpreter::Source::hasIndexOrProjection(const String & file_name) const
bool MutationsInterpreter::Source::hasSecondaryIndex(const String & name) const
{
return part && part->checksums.has(file_name);
return part && part->hasSecondaryIndex(name);
}
bool MutationsInterpreter::Source::hasProjection(const String & name) const
{
return part && part->hasProjection(name);
}
static Names getAvailableColumnsWithVirtuals(StorageMetadataPtr metadata_snapshot, const IStorage & storage)
@ -533,13 +539,24 @@ void MutationsInterpreter::prepare(bool dry_run)
validateUpdateColumns(source, metadata_snapshot, updated_columns, column_to_affected_materialized);
}
std::function<bool(const String & file_name)> has_index_or_projection
= [&](const String & file_name) { return source.hasIndexOrProjection(file_name); };
StorageInMemoryMetadata::HasDependencyCallback has_dependency =
[&](const String & name, ColumnDependency::Kind kind)
{
if (kind == ColumnDependency::PROJECTION)
return source.hasProjection(name);
if (kind == ColumnDependency::SKIP_INDEX)
return source.hasSecondaryIndex(name);
return true;
};
if (settings.recalculate_dependencies_of_updated_columns)
dependencies = getAllColumnDependencies(metadata_snapshot, updated_columns, has_index_or_projection);
dependencies = getAllColumnDependencies(metadata_snapshot, updated_columns, has_dependency);
bool has_alter_delete = false;
std::vector<String> read_columns;
/// First, break a sequence of commands into stages.
for (auto & command : commands)
{
@ -558,6 +575,7 @@ void MutationsInterpreter::prepare(bool dry_run)
predicate = makeASTFunction("isZeroOrNull", predicate);
stages.back().filters.push_back(predicate);
has_alter_delete = true;
}
else if (command.type == MutationCommand::UPDATE)
{
@ -692,8 +710,7 @@ void MutationsInterpreter::prepare(bool dry_run)
if (it == std::cend(indices_desc))
throw Exception(ErrorCodes::BAD_ARGUMENTS, "Unknown index: {}", command.index_name);
if (!source.hasIndexOrProjection("skp_idx_" + it->name + ".idx")
&& !source.hasIndexOrProjection("skp_idx_" + it->name + ".idx2"))
if (!source.hasSecondaryIndex(it->name))
{
auto query = (*it).expression_list_ast->clone();
auto syntax_result = TreeRewriter(context).analyze(query, all_columns);
@ -707,7 +724,7 @@ void MutationsInterpreter::prepare(bool dry_run)
{
mutation_kind.set(MutationKind::MUTATE_INDEX_PROJECTION);
const auto & projection = projections_desc.get(command.projection_name);
if (!source.hasIndexOrProjection(projection.getDirectoryName()))
if (!source.hasProjection(projection.name))
{
for (const auto & column : projection.required_columns)
dependencies.emplace(column, ColumnDependency::PROJECTION);
@ -731,8 +748,9 @@ void MutationsInterpreter::prepare(bool dry_run)
{
// just recalculate ttl_infos without remove expired data
auto all_columns_vec = all_columns.getNames();
auto new_dependencies = metadata_snapshot->getColumnDependencies(
NameSet(all_columns_vec.begin(), all_columns_vec.end()), false, has_index_or_projection);
auto all_columns_set = NameSet(all_columns_vec.begin(), all_columns_vec.end());
auto new_dependencies = metadata_snapshot->getColumnDependencies(all_columns_set, false, has_dependency);
for (const auto & dependency : new_dependencies)
{
if (dependency.kind == ColumnDependency::TTL_EXPRESSION)
@ -757,8 +775,8 @@ void MutationsInterpreter::prepare(bool dry_run)
}
auto all_columns_vec = all_columns.getNames();
auto all_dependencies = getAllColumnDependencies(
metadata_snapshot, NameSet(all_columns_vec.begin(), all_columns_vec.end()), has_index_or_projection);
auto all_columns_set = NameSet(all_columns_vec.begin(), all_columns_vec.end());
auto all_dependencies = getAllColumnDependencies(metadata_snapshot, all_columns_set, has_dependency);
for (const auto & dependency : all_dependencies)
{
@ -767,7 +785,7 @@ void MutationsInterpreter::prepare(bool dry_run)
}
/// Recalc only skip indices and projections of columns which could be updated by TTL.
auto new_dependencies = metadata_snapshot->getColumnDependencies(new_updated_columns, true, has_index_or_projection);
auto new_dependencies = metadata_snapshot->getColumnDependencies(new_updated_columns, true, has_dependency);
for (const auto & dependency : new_dependencies)
{
if (dependency.kind == ColumnDependency::SKIP_INDEX || dependency.kind == ColumnDependency::PROJECTION)
@ -861,30 +879,44 @@ void MutationsInterpreter::prepare(bool dry_run)
for (const auto & index : metadata_snapshot->getSecondaryIndices())
{
if (source.hasIndexOrProjection("skp_idx_" + index.name + ".idx") || source.hasIndexOrProjection("skp_idx_" + index.name + ".idx2"))
if (!source.hasSecondaryIndex(index.name))
continue;
if (has_alter_delete)
{
const auto & index_cols = index.expression->getRequiredColumns();
bool changed = std::any_of(
index_cols.begin(),
index_cols.end(),
[&](const auto & col) { return updated_columns.contains(col) || changed_columns.contains(col); });
if (changed)
materialized_indices.insert(index.name);
materialized_indices.insert(index.name);
continue;
}
const auto & index_cols = index.expression->getRequiredColumns();
bool changed = std::any_of(
index_cols.begin(),
index_cols.end(),
[&](const auto & col) { return updated_columns.contains(col) || changed_columns.contains(col); });
if (changed)
materialized_indices.insert(index.name);
}
for (const auto & projection : metadata_snapshot->getProjections())
{
if (source.hasIndexOrProjection(projection.getDirectoryName()))
if (!source.hasProjection(projection.name))
continue;
if (has_alter_delete)
{
const auto & projection_cols = projection.required_columns;
bool changed = std::any_of(
projection_cols.begin(),
projection_cols.end(),
[&](const auto & col) { return updated_columns.contains(col) || changed_columns.contains(col); });
if (changed)
materialized_projections.insert(projection.name);
materialized_projections.insert(projection.name);
continue;
}
const auto & projection_cols = projection.required_columns;
bool changed = std::any_of(
projection_cols.begin(),
projection_cols.end(),
[&](const auto & col) { return updated_columns.contains(col) || changed_columns.contains(col); });
if (changed)
materialized_projections.insert(projection.name);
}
/// Stages might be empty when we materialize skip indices or projections which don't add any

View File

@ -120,7 +120,8 @@ public:
bool supportsLightweightDelete() const;
bool hasLightweightDeleteMask() const;
bool materializeTTLRecalculateOnly() const;
bool hasIndexOrProjection(const String & file_name) const;
bool hasSecondaryIndex(const String & name) const;
bool hasProjection(const String & name) const;
void read(
Stage & first_stage,

View File

@ -1983,6 +1983,12 @@ IndexSize IMergeTreeDataPart::getSecondaryIndexSize(const String & secondary_ind
return ColumnSize{};
}
bool IMergeTreeDataPart::hasSecondaryIndex(const String & index_name) const
{
auto file_name = INDEX_FILE_PREFIX + index_name;
return checksums.has(file_name + ".idx") || checksums.has(file_name + ".idx2");
}
void IMergeTreeDataPart::accumulateColumnSizes(ColumnToSize & column_to_size) const
{
for (const auto & [column_name, size] : columns_sizes)

View File

@ -122,6 +122,9 @@ public:
/// Otherwise return information about secondary index size on disk.
IndexSize getSecondaryIndexSize(const String & secondary_index_name) const;
/// Returns true if there is materialized index with specified name in part.
bool hasSecondaryIndex(const String & index_name) const;
/// Return information about column size on disk for all columns in part
ColumnSize getTotalColumnsSize() const { return total_columns_size; }

View File

@ -453,6 +453,7 @@ static ExecuteTTLType shouldExecuteTTL(const StorageMetadataPtr & metadata_snaps
/// Return set of indices which should be recalculated during mutation also
/// wraps input stream into additional expression stream
static std::set<MergeTreeIndexPtr> getIndicesToRecalculate(
const MergeTreeDataPartPtr & source_part,
QueryPipelineBuilder & builder,
const StorageMetadataPtr & metadata_snapshot,
ContextPtr context,
@ -463,10 +464,15 @@ static std::set<MergeTreeIndexPtr> getIndicesToRecalculate(
std::set<MergeTreeIndexPtr> indices_to_recalc;
ASTPtr indices_recalc_expr_list = std::make_shared<ASTExpressionList>();
const auto & indices = metadata_snapshot->getSecondaryIndices();
bool is_full_part_storage = isFullPartStorage(source_part->getDataPartStorage());
for (const auto & index : indices)
{
if (materialized_indices.contains(index.name))
bool need_recalculate =
materialized_indices.contains(index.name)
|| (!is_full_part_storage && source_part->hasSecondaryIndex(index.name));
if (need_recalculate)
{
if (indices_to_recalc.insert(index_factory.get(index)).second)
{
@ -496,15 +502,23 @@ static std::set<MergeTreeIndexPtr> getIndicesToRecalculate(
}
static std::set<ProjectionDescriptionRawPtr> getProjectionsToRecalculate(
const MergeTreeDataPartPtr & source_part,
const StorageMetadataPtr & metadata_snapshot,
const NameSet & materialized_projections)
{
std::set<ProjectionDescriptionRawPtr> projections_to_recalc;
bool is_full_part_storage = isFullPartStorage(source_part->getDataPartStorage());
for (const auto & projection : metadata_snapshot->getProjections())
{
if (materialized_projections.contains(projection.name))
bool need_recalculate =
materialized_projections.contains(projection.name)
|| (!is_full_part_storage && source_part->hasProjection(projection.name));
if (need_recalculate)
projections_to_recalc.insert(&projection);
}
return projections_to_recalc;
}
@ -1279,14 +1293,20 @@ private:
removed_indices.insert(command.column_name);
}
bool is_full_part_storage = isFullPartStorage(ctx->new_data_part->getDataPartStorage());
const auto & indices = ctx->metadata_snapshot->getSecondaryIndices();
MergeTreeIndices skip_indices;
for (const auto & idx : indices)
{
if (removed_indices.contains(idx.name))
continue;
if (ctx->materialized_indices.contains(idx.name))
bool need_recalculate =
ctx->materialized_indices.contains(idx.name)
|| (!is_full_part_storage && ctx->source_part->hasSecondaryIndex(idx.name));
if (need_recalculate)
{
skip_indices.push_back(MergeTreeIndexFactory::instance().get(idx));
}
@ -1319,7 +1339,11 @@ private:
if (removed_projections.contains(projection.name))
continue;
if (ctx->materialized_projections.contains(projection.name))
bool need_recalculate =
ctx->materialized_projections.contains(projection.name)
|| (!is_full_part_storage && ctx->source_part->hasProjection(projection.name));
if (need_recalculate)
{
ctx->projections_to_build.push_back(&projection);
}
@ -1921,9 +1945,16 @@ bool MutateTask::prepare()
else /// TODO: check that we modify only non-key columns in this case.
{
ctx->indices_to_recalc = MutationHelpers::getIndicesToRecalculate(
ctx->mutating_pipeline_builder, ctx->metadata_snapshot, ctx->context, ctx->materialized_indices);
ctx->source_part,
ctx->mutating_pipeline_builder,
ctx->metadata_snapshot,
ctx->context,
ctx->materialized_indices);
ctx->projections_to_recalc = MutationHelpers::getProjectionsToRecalculate(ctx->metadata_snapshot, ctx->materialized_projections);
ctx->projections_to_recalc = MutationHelpers::getProjectionsToRecalculate(
ctx->source_part,
ctx->metadata_snapshot,
ctx->materialized_projections);
ctx->files_to_skip = MutationHelpers::collectFilesToSkip(
ctx->source_part,

View File

@ -239,7 +239,7 @@ bool StorageInMemoryMetadata::hasAnyGroupByTTL() const
ColumnDependencies StorageInMemoryMetadata::getColumnDependencies(
const NameSet & updated_columns,
bool include_ttl_target,
const std::function<bool(const String & file_name)> & has_indice_or_projection) const
const HasDependencyCallback & has_dependency) const
{
if (updated_columns.empty())
return {};
@ -268,13 +268,13 @@ ColumnDependencies StorageInMemoryMetadata::getColumnDependencies(
for (const auto & index : getSecondaryIndices())
{
if (has_indice_or_projection("skp_idx_" + index.name + ".idx") || has_indice_or_projection("skp_idx_" + index.name + ".idx2"))
if (has_dependency(index.name, ColumnDependency::SKIP_INDEX))
add_dependent_columns(index.expression, indices_columns);
}
for (const auto & projection : getProjections())
{
if (has_indice_or_projection(projection.getDirectoryName()))
if (has_dependency(projection.name, ColumnDependency::PROJECTION))
add_dependent_columns(&projection, projections_columns);
}

View File

@ -147,12 +147,14 @@ struct StorageInMemoryMetadata
TTLDescriptions getGroupByTTLs() const;
bool hasAnyGroupByTTL() const;
using HasDependencyCallback = std::function<bool(const String &, ColumnDependency::Kind)>;
/// Returns columns, which will be needed to calculate dependencies (skip indices, projections,
/// TTL expressions) if we update @updated_columns set of columns.
ColumnDependencies getColumnDependencies(
const NameSet & updated_columns,
bool include_ttl_target,
const std::function<bool(const String & file_name)> & has_indice_or_projection) const;
const HasDependencyCallback & has_dependency) const;
/// Block with ordinary + materialized columns.
Block getSampleBlock() const;

View File

@ -885,7 +885,13 @@ tupleDivide
tupleDivideByNumber
tupleElement
tupleHammingDistance
tupleIntDiv
tupleIntDivByNumber
tupleIntDivOrZero
tupleIntDivOrZeroByNumber
tupleMinus
tupleModulo
tupleModuloByNumber
tupleMultiply
tupleMultiplyByNumber
tupleNegate

View File

@ -0,0 +1,6 @@
2
0
3355402240
3355402240
3321851904
3321851904

View File

@ -0,0 +1,26 @@
set mutations_sync = 2;
drop table if exists t_delete_skip_index;
create table t_delete_skip_index (x UInt32, y String, index i y type minmax granularity 3) engine = MergeTree order by tuple();
insert into t_delete_skip_index select number, toString(number) from numbers(8192 * 10);
select count() from t_delete_skip_index where y in (4, 5);
alter table t_delete_skip_index delete where x < 8192;
select count() from t_delete_skip_index where y in (4, 5);
drop table if exists t_delete_skip_index;
drop table if exists t_delete_projection;
create table t_delete_projection (x UInt32, y UInt64, projection p (select sum(y))) engine = MergeTree order by tuple();
insert into t_delete_projection select number, toString(number) from numbers(8192 * 10);
select sum(y) from t_delete_projection settings optimize_use_projections = 0;
select sum(y) from t_delete_projection settings optimize_use_projections = 0, force_optimize_projection = 1;
alter table t_delete_projection delete where x < 8192;
select sum(y) from t_delete_projection settings optimize_use_projections = 0;
select sum(y) from t_delete_projection settings optimize_use_projections = 0, force_optimize_projection = 1;
drop table if exists t_delete_projection;

View File

@ -0,0 +1,4 @@
(1,0)
(2,2)
(2,2)
(0,0)

View File

@ -0,0 +1,4 @@
SELECT (5,4) % 2;
SELECT intDiv((5,4), 2);
SELECT intDivOrZero((5,4), 2);
SELECT intDivOrZero((5,4), 0);