Merge branch 'master' into remove-check-of-config-file-sizes

This commit is contained in:
Igor Nikonov 2023-09-20 18:40:03 +02:00 committed by GitHub
commit b5a6c85981
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
73 changed files with 1064 additions and 228 deletions

View File

@ -203,9 +203,10 @@ Parameter `NumTrees` is the number of trees which the algorithm creates (default
more accurate search results but slower index creation / query times (approximately linearly) as well as larger index sizes.
:::note
Indexes over columns of type `Array` will generally work faster than indexes on `Tuple` columns. All arrays **must** have same length. Use
[CONSTRAINT](/docs/en/sql-reference/statements/create/table.md#constraints) to avoid errors. For example, `CONSTRAINT constraint_name_1
CHECK length(vectors) = 256`.
Indexes over columns of type `Array` will generally work faster than indexes on `Tuple` columns. All arrays must have same length. To avoid
errors, you can use a [CONSTRAINT](/docs/en/sql-reference/statements/create/table.md#constraints), for example, `CONSTRAINT
constraint_name_1 CHECK length(vectors) = 256`. Also, unspecified `Array` values in INSERT statements (i.e. default values) are not
supported.
:::
Setting `annoy_index_search_k_nodes` (default: `NumTrees * LIMIT`) determines how many tree nodes are inspected during SELECTs. Larger
@ -223,6 +224,7 @@ SETTINGS annoy_index_search_k_nodes=100;
The Annoy index currently does not work with per-table, non-default `index_granularity` settings (see
[here](https://github.com/ClickHouse/ClickHouse/pull/51325#issuecomment-1605920475)). If necessary, the value must be changed in config.xml.
:::
## USearch {#usearch}
This type of ANN index is based on the [the USearch library](https://github.com/unum-cloud/usearch), which implements the [HNSW

View File

@ -1354,3 +1354,4 @@ In this sample configuration:
- `_part_uuid` — Unique part identifier (if enabled MergeTree setting `assign_part_uuids`).
- `_partition_value` — Values (a tuple) of a `partition by` expression.
- `_sample_factor` — Sample factor (from the query).
- `_block_number` — Block number of the row, it is persisted on merges when `allow_experimental_block_number_column` is set to true.

View File

@ -142,7 +142,7 @@ As a result, the query cache stores for each query multiple (partial)
result blocks. While this behavior is a good default, it can be suppressed using setting
[query_cache_squash_partial_results](settings/settings.md#query-cache-squash-partial-results).
Also, results of queries with non-deterministic functions are not cached. Such functions include
Also, results of queries with non-deterministic functions are not cached by default. Such functions include
- functions for accessing dictionaries: [`dictGet()`](../sql-reference/functions/ext-dict-functions.md#dictGet) etc.
- [user-defined functions](../sql-reference/statements/create/function.md),
- functions which return the current date or time: [`now()`](../sql-reference/functions/date-time-functions.md#now),
@ -158,7 +158,7 @@ Also, results of queries with non-deterministic functions are not cached. Such f
- functions which depend on the environment: [`currentUser()`](../sql-reference/functions/other-functions.md#currentUser),
[`queryID()`](../sql-reference/functions/other-functions.md#queryID),
[`getMacro()`](../sql-reference/functions/other-functions.md#getMacro) etc.
Caching of non-deterministic functions can be forced regardless using setting
To force caching of results of queries with non-deterministic functions regardless, use setting
[query_cache_store_results_of_queries_with_nondeterministic_functions](settings/settings.md#query-cache-store-results-of-queries-with-nondeterministic-functions).
Finally, entries in the query cache are not shared between users due to security reasons. For example, user A must not be able to bypass a

View File

@ -854,3 +854,9 @@ Possible values:
- `Always` or `Never`.
Default value: `Never`
## allow_experimental_block_number_column
Persists virtual column `_block_number` on merges.
Default value: false.

View File

@ -4,7 +4,7 @@ sidebar_position: 52
sidebar_label: Array(T)
---
# Array(t)
# Array(T)
An array of `T`-type items, with the starting array index as 1. `T` can be any data type, including an array.

View File

@ -32,6 +32,8 @@
#include <Common/randomSeed.h>
#include <Common/ThreadPool.h>
#include <Loggers/Loggers.h>
#include <Loggers/OwnFormattingChannel.h>
#include <Loggers/OwnPatternFormatter.h>
#include <IO/ReadBufferFromFile.h>
#include <IO/ReadBufferFromString.h>
#include <IO/WriteBufferFromFileDescriptor.h>
@ -599,7 +601,9 @@ void LocalServer::processConfig()
{
auto poco_logs_level = Poco::Logger::parseLevel(level);
Poco::Logger::root().setLevel(poco_logs_level);
Poco::Logger::root().setChannel(Poco::AutoPtr<Poco::SimpleFileChannel>(new Poco::SimpleFileChannel(server_logs_file)));
Poco::AutoPtr<OwnPatternFormatter> pf = new OwnPatternFormatter;
Poco::AutoPtr<OwnFormattingChannel> log = new OwnFormattingChannel(pf, new Poco::SimpleFileChannel(server_logs_file));
Poco::Logger::root().setChannel(log);
logging_initialized = true;
}
else if (logging || is_interactive)

View File

@ -585,6 +585,8 @@
M(700, USER_SESSION_LIMIT_EXCEEDED) \
M(701, CLUSTER_DOESNT_EXIST) \
M(702, CLIENT_INFO_DOES_NOT_MATCH) \
M(703, INVALID_IDENTIFIER) \
M(704, CANNOT_USE_QUERY_CACHE_WITH_NONDETERMINISTIC_FUNCTIONS) \
\
M(999, KEEPER_EXCEPTION) \
M(1000, POCO_EXCEPTION) \

231
src/DataTypes/Utils.cpp Normal file
View File

@ -0,0 +1,231 @@
#include <DataTypes/Utils.h>
#include <DataTypes/DataTypesDecimal.h>
#include <DataTypes/DataTypeNullable.h>
#include <DataTypes/DataTypeLowCardinality.h>
#include <DataTypes/DataTypeArray.h>
#include <DataTypes/DataTypeMap.h>
#include <DataTypes/DataTypeTuple.h>
namespace DB
{
bool canBeSafelyCasted(const DataTypePtr & from_type, const DataTypePtr & to_type)
{
auto from_which_type = WhichDataType(from_type->getTypeId());
bool to_type_was_nullable = isNullableOrLowCardinalityNullable(to_type);
auto to_type_unwrapped = removeNullable(removeLowCardinality(to_type));
if (from_type->equals(*to_type_unwrapped))
return true;
auto to_which_type = WhichDataType(to_type_unwrapped->getTypeId());
switch (from_which_type.idx)
{
case TypeIndex::UInt8:
case TypeIndex::UInt16:
case TypeIndex::UInt32:
case TypeIndex::UInt64:
case TypeIndex::UInt128:
case TypeIndex::UInt256:
{
if (to_which_type.isUInt() &&
to_type_unwrapped->getSizeOfValueInMemory() >= from_type->getSizeOfValueInMemory())
return true;
if (to_which_type.isString())
return true;
return false;
}
case TypeIndex::Int8:
case TypeIndex::Int16:
case TypeIndex::Int32:
case TypeIndex::Int64:
case TypeIndex::Int128:
case TypeIndex::Int256:
{
if (to_which_type.isInt() &&
to_type_unwrapped->getSizeOfValueInMemory() >= from_type->getSizeOfValueInMemory())
return true;
if (to_which_type.isString())
return true;
return false;
}
case TypeIndex::Float32:
{
if (to_which_type.isFloat64() || to_which_type.isString())
return true;
return false;
}
case TypeIndex::Float64:
case TypeIndex::Date:
case TypeIndex::Date32:
case TypeIndex::DateTime:
case TypeIndex::DateTime64:
case TypeIndex::FixedString:
case TypeIndex::Enum8:
case TypeIndex::Enum16:
case TypeIndex::IPv6:
{
if (to_which_type.isString())
return true;
return false;
}
case TypeIndex::Decimal32:
case TypeIndex::Decimal64:
case TypeIndex::Decimal128:
case TypeIndex::Decimal256:
{
if (to_which_type.isDecimal())
{
auto from_type_decimal_precision = getDecimalPrecision(*from_type);
auto to_type_decimal_precision = getDecimalPrecision(*to_type_unwrapped);
if (from_type_decimal_precision > to_type_decimal_precision)
return false;
auto from_type_decimal_scale = getDecimalScale(*from_type);
auto to_type_decimal_scale = getDecimalScale(*to_type_unwrapped);
if (from_type_decimal_scale > to_type_decimal_scale)
return false;
return true;
}
if (to_which_type.isString())
return true;
return false;
}
case TypeIndex::UUID:
{
if (to_which_type.isUInt128() || to_which_type.isString())
return true;
return false;
}
case TypeIndex::IPv4:
{
if (to_which_type.isUInt32() || to_which_type.isUInt64() || to_which_type.isString())
return true;
return false;
}
case TypeIndex::Nullable:
{
if (to_type_was_nullable)
{
const auto & from_type_nullable = assert_cast<const DataTypeNullable &>(*from_type);
return canBeSafelyCasted(from_type_nullable.getNestedType(), to_type_unwrapped);
}
if (to_which_type.isString())
return true;
return false;
}
case TypeIndex::LowCardinality:
{
const auto & from_type_low_cardinality = assert_cast<const DataTypeLowCardinality &>(*from_type);
return canBeSafelyCasted(from_type_low_cardinality.getDictionaryType(), to_type_unwrapped);
}
case TypeIndex::Array:
{
if (to_which_type.isArray())
{
const auto & from_type_array = assert_cast<const DataTypeArray &>(*from_type);
const auto & to_type_array = assert_cast<const DataTypeArray &>(*to_type_unwrapped);
return canBeSafelyCasted(from_type_array.getNestedType(), to_type_array.getNestedType());
}
if (to_which_type.isString())
return true;
return false;
}
case TypeIndex::Map:
{
if (to_which_type.isMap())
{
const auto & from_type_map = assert_cast<const DataTypeMap &>(*from_type);
const auto & to_type_map = assert_cast<const DataTypeMap &>(*to_type_unwrapped);
if (!canBeSafelyCasted(from_type_map.getKeyType(), to_type_map.getKeyType()))
return false;
if (!canBeSafelyCasted(from_type_map.getValueType(), to_type_map.getValueType()))
return false;
return true;
}
if (to_which_type.isArray())
{
// Map nested type is Array(Tuple(key_type, value_type))
const auto & from_type_map = assert_cast<const DataTypeMap &>(*from_type);
const auto & to_type_array = assert_cast<const DataTypeArray &>(*to_type_unwrapped);
const auto * to_type_nested_tuple_type = typeid_cast<const DataTypeTuple *>(to_type_array.getNestedType().get());
if (!to_type_nested_tuple_type)
return false;
const auto & to_type_tuple_elements = to_type_nested_tuple_type->getElements();
if (to_type_tuple_elements.size() != 2)
return false;
if (!canBeSafelyCasted(from_type_map.getKeyType(), to_type_tuple_elements[0]))
return false;
if (!canBeSafelyCasted(from_type_map.getValueType(), to_type_tuple_elements[1]))
return false;
return true;
}
if (to_which_type.isString())
return true;
return false;
}
case TypeIndex::Tuple:
{
if (to_which_type.isTuple())
{
const auto & from_type_tuple = assert_cast<const DataTypeTuple &>(*from_type);
const auto & to_type_tuple = assert_cast<const DataTypeTuple &>(*to_type_unwrapped);
const auto & from_tuple_type_elements = from_type_tuple.getElements();
const auto & to_tuple_type_elements = to_type_tuple.getElements();
size_t lhs_type_elements_size = from_tuple_type_elements.size();
if (lhs_type_elements_size != to_tuple_type_elements.size())
return false;
for (size_t i = 0; i < lhs_type_elements_size; ++i)
if (!canBeSafelyCasted(from_tuple_type_elements[i], to_tuple_type_elements[i]))
return false;
return true;
}
if (to_which_type.isString())
return true;
return false;
}
case TypeIndex::String:
case TypeIndex::Object:
case TypeIndex::Set:
case TypeIndex::Interval:
case TypeIndex::Function:
case TypeIndex::AggregateFunction:
case TypeIndex::Nothing:
return false;
}
return true;
}
}

19
src/DataTypes/Utils.h Normal file
View File

@ -0,0 +1,19 @@
#pragma once
#include <DataTypes/IDataType.h>
namespace DB
{
/** Returns true if from_type can be safely casted to to_type.
*
* Examples:
* From type UInt8 to type UInt16 returns true.
* From type UInt16 to type UInt8 returns false.
* From type String to type LowCardinality(String) returns true.
* From type LowCardinality(String) to type String returns true.
* From type String to type UInt8 returns false.
*/
bool canBeSafelyCasted(const DataTypePtr & from_type, const DataTypePtr & to_type);
}

View File

@ -6,7 +6,6 @@
#include <Columns/ColumnNullable.h>
#include <Columns/ColumnLowCardinality.h>
#include <Common/assert_cast.h>
#include <DataTypes/DataTypeNullable.h>
namespace DB

View File

@ -199,6 +199,7 @@ public:
/// Parse JSON for every row
Impl impl;
GeneratorJSONPath<JSONParser> generator_json_path(res);
for (const auto i : collections::range(0, input_rows_count))
{
std::string_view json{
@ -208,7 +209,9 @@ public:
bool added_to_column = false;
if (document_ok)
{
added_to_column = impl.insertResultToColumn(*to, document, res, context);
/// Instead of creating a new generator for each row, we can reuse the same one.
generator_json_path.reinitialize();
added_to_column = impl.insertResultToColumn(*to, document, generator_json_path, context);
}
if (!added_to_column)
{
@ -287,9 +290,8 @@ public:
static size_t getNumberOfIndexArguments(const ColumnsWithTypeAndName & arguments) { return arguments.size() - 1; }
static bool insertResultToColumn(IColumn & dest, const Element & root, ASTPtr & query_ptr, const ContextPtr &)
static bool insertResultToColumn(IColumn & dest, const Element & root, GeneratorJSONPath<JSONParser> & generator_json_path, const ContextPtr &)
{
GeneratorJSONPath<JSONParser> generator_json_path(query_ptr);
Element current_element = root;
VisitorStatus status;
while ((status = generator_json_path.getNextItem(current_element)) != VisitorStatus::Exhausted)
@ -337,9 +339,8 @@ public:
static size_t getNumberOfIndexArguments(const ColumnsWithTypeAndName & arguments) { return arguments.size() - 1; }
static bool insertResultToColumn(IColumn & dest, const Element & root, ASTPtr & query_ptr, const ContextPtr & context)
static bool insertResultToColumn(IColumn & dest, const Element & root, GeneratorJSONPath<JSONParser> & generator_json_path, const ContextPtr & context)
{
GeneratorJSONPath<JSONParser> generator_json_path(query_ptr);
Element current_element = root;
VisitorStatus status;
@ -405,11 +406,10 @@ public:
static size_t getNumberOfIndexArguments(const ColumnsWithTypeAndName & arguments) { return arguments.size() - 1; }
static bool insertResultToColumn(IColumn & dest, const Element & root, ASTPtr & query_ptr, const ContextPtr &)
static bool insertResultToColumn(IColumn & dest, const Element & root, GeneratorJSONPath<JSONParser> & generator_json_path, const ContextPtr &)
{
ColumnString & col_str = assert_cast<ColumnString &>(dest);
GeneratorJSONPath<JSONParser> generator_json_path(query_ptr);
Element current_element = root;
VisitorStatus status;
bool success = false;

View File

@ -105,6 +105,16 @@ public:
}
}
void reinitialize()
{
while (current_visitor >= 0)
{
visitors[current_visitor]->reinitialize();
current_visitor--;
}
current_visitor = 0;
}
private:
bool updateVisitorsForNextRun()
{

View File

@ -321,7 +321,7 @@ namespace
/// To avoid such a deadlock we unlock `lock` before entering `pool_ptr->second->get`.
lock.unlock();
auto retry_timeout = timeouts.connection_timeout.totalMicroseconds();
auto retry_timeout = timeouts.connection_timeout.totalMilliseconds();
auto session = pool_ptr->second->get(retry_timeout);
setTimeouts(*session, timeouts);

View File

@ -16,6 +16,7 @@ namespace DB
namespace ErrorCodes
{
extern const int LOGICAL_ERROR;
extern const int INVALID_IDENTIFIER;
}
DatabaseAndTableWithAlias::DatabaseAndTableWithAlias(const ASTTableIdentifier & identifier, const String & current_database)
@ -37,7 +38,7 @@ DatabaseAndTableWithAlias::DatabaseAndTableWithAlias(const ASTIdentifier & ident
else if (identifier.name_parts.size() == 1)
table = identifier.name_parts[0];
else
throw Exception(ErrorCodes::LOGICAL_ERROR, "Logical error: invalid identifier");
throw Exception(ErrorCodes::INVALID_IDENTIFIER, "Invalid identifier");
if (database.empty())
database = current_database;
@ -50,7 +51,7 @@ DatabaseAndTableWithAlias::DatabaseAndTableWithAlias(const ASTPtr & node, const
else if (const auto * identifier = node->as<ASTIdentifier>())
*this = DatabaseAndTableWithAlias(*identifier, current_database);
else
throw Exception(ErrorCodes::LOGICAL_ERROR, "Logical error: identifier or table identifier expected");
throw Exception(ErrorCodes::INVALID_IDENTIFIER, "Identifier or table identifier expected");
}
DatabaseAndTableWithAlias::DatabaseAndTableWithAlias(const ASTTableExpression & table_expression, const String & current_database)

View File

@ -31,6 +31,7 @@
#include <Storages/StorageInMemoryMetadata.h>
#include <Storages/WindowView/StorageWindowView.h>
#include <Storages/StorageReplicatedMergeTree.h>
#include <Storages/BlockNumberColumn.h>
#include <Interpreters/Context.h>
#include <Interpreters/executeDDLQueryOnCluster.h>
@ -95,7 +96,6 @@ namespace ErrorCodes
extern const int SUSPICIOUS_TYPE_FOR_LOW_CARDINALITY;
extern const int ILLEGAL_SYNTAX_FOR_DATA_TYPE;
extern const int ILLEGAL_COLUMN;
extern const int ILLEGAL_INDEX;
extern const int LOGICAL_ERROR;
extern const int UNKNOWN_DATABASE;
extern const int PATH_ACCESS_DENIED;
@ -698,8 +698,6 @@ InterpreterCreateQuery::TableProperties InterpreterCreateQuery::getTableProperti
for (const auto & index : create.columns_list->indices->children)
{
IndexDescription index_desc = IndexDescription::getIndexFromAST(index->clone(), properties.columns, getContext());
if (properties.indices.has(index_desc.name))
throw Exception(ErrorCodes::ILLEGAL_INDEX, "Duplicated index name {}", backQuoteIfNeed(index_desc.name));
const auto & settings = getContext()->getSettingsRef();
if (index_desc.type == INVERTED_INDEX_NAME && !settings.allow_experimental_inverted_index)
{
@ -714,7 +712,6 @@ InterpreterCreateQuery::TableProperties InterpreterCreateQuery::getTableProperti
properties.indices.push_back(index_desc);
}
if (create.columns_list->projections)
for (const auto & projection_ast : create.columns_list->projections->children)
{
@ -837,6 +834,13 @@ void InterpreterCreateQuery::validateTableStructure(const ASTCreateQuery & creat
"Cannot create table with column '{}' for *MergeTree engines because it "
"is reserved for lightweight delete feature",
LightweightDeleteDescription::FILTER_COLUMN.name);
auto search_block_number = all_columns.find(BlockNumberColumn::name);
if (search_block_number != all_columns.end())
throw Exception(ErrorCodes::ILLEGAL_COLUMN,
"Cannot create table with column '{}' for *MergeTree engines because it "
"is reserved for storing block number",
BlockNumberColumn::name);
}
const auto & settings = getContext()->getSettingsRef();

View File

@ -7,6 +7,7 @@
#include <Storages/MergeTree/MergeTreeData.h>
#include <Storages/MergeTree/StorageFromMergeTreeDataPart.h>
#include <Storages/StorageMergeTree.h>
#include <Storages/BlockNumberColumn.h>
#include <Processors/Transforms/FilterTransform.h>
#include <Processors/Transforms/ExpressionTransform.h>
#include <Processors/Transforms/CreatingSetsTransform.h>
@ -40,7 +41,6 @@
#include <Parsers/makeASTForLogicalFunction.h>
#include <Common/logger_useful.h>
namespace DB
{
@ -56,6 +56,7 @@ namespace ErrorCodes
extern const int THERE_IS_NO_COLUMN;
}
namespace
{
@ -416,6 +417,12 @@ static void validateUpdateColumns(
found = true;
}
/// Dont allow to override value of block number virtual column
if (!found && column_name == BlockNumberColumn::name)
{
throw Exception(ErrorCodes::NOT_IMPLEMENTED, "Update is not supported for virtual column {} ", backQuote(column_name));
}
if (!found)
{
for (const auto & col : metadata_snapshot->getColumns().getMaterialized())
@ -511,7 +518,8 @@ void MutationsInterpreter::prepare(bool dry_run)
for (const auto & [name, _] : command.column_to_update_expression)
{
if (!available_columns_set.contains(name) && name != LightweightDeleteDescription::FILTER_COLUMN.name)
if (!available_columns_set.contains(name) && name != LightweightDeleteDescription::FILTER_COLUMN.name
&& name != BlockNumberColumn::name)
throw Exception(ErrorCodes::THERE_IS_NO_COLUMN,
"Column {} is updated but not requested to read", name);
@ -613,6 +621,8 @@ void MutationsInterpreter::prepare(bool dry_run)
type = physical_column->type;
else if (column == LightweightDeleteDescription::FILTER_COLUMN.name)
type = LightweightDeleteDescription::FILTER_COLUMN.type;
else if (column == BlockNumberColumn::name)
type = BlockNumberColumn::type;
else
throw Exception(ErrorCodes::LOGICAL_ERROR, "Unknown column {}", column);
@ -1087,6 +1097,18 @@ struct VirtualColumns
virtuals.emplace_back(ColumnAndPosition{.column = std::move(column), .position = i});
}
else if (columns_to_read[i] == BlockNumberColumn::name)
{
if (!part->getColumns().contains(BlockNumberColumn::name))
{
ColumnWithTypeAndName block_number_column;
block_number_column.type = BlockNumberColumn::type;
block_number_column.column = block_number_column.type->createColumnConst(0, part->info.min_block);
block_number_column.name = std::move(columns_to_read[i]);
virtuals.emplace_back(ColumnAndPosition{.column = std::move(block_number_column), .position = i});
}
}
}
if (!virtuals.empty())

View File

@ -48,7 +48,7 @@ static bool equals(const DataTypes & lhs, const DataTypes & rhs)
FutureSetFromStorage::FutureSetFromStorage(SetPtr set_) : set(std::move(set_)) {}
SetPtr FutureSetFromStorage::get() const { return set; }
const DataTypes & FutureSetFromStorage::getTypes() const { return set->getElementsTypes(); }
DataTypes FutureSetFromStorage::getTypes() const { return set->getElementsTypes(); }
SetPtr FutureSetFromStorage::buildOrderedSetInplace(const ContextPtr &)
{
@ -73,7 +73,7 @@ FutureSetFromTuple::FutureSetFromTuple(Block block, const Settings & settings)
set->finishInsert();
}
const DataTypes & FutureSetFromTuple::getTypes() const { return set->getElementsTypes(); }
DataTypes FutureSetFromTuple::getTypes() const { return set->getElementsTypes(); }
SetPtr FutureSetFromTuple::buildOrderedSetInplace(const ContextPtr & context)
{
@ -138,7 +138,7 @@ void FutureSetFromSubquery::setQueryPlan(std::unique_ptr<QueryPlan> source_)
set_and_key->set->setHeader(source->getCurrentDataStream().header.getColumnsWithTypeAndName());
}
const DataTypes & FutureSetFromSubquery::getTypes() const
DataTypes FutureSetFromSubquery::getTypes() const
{
return set_and_key->set->getElementsTypes();
}
@ -183,7 +183,10 @@ SetPtr FutureSetFromSubquery::buildOrderedSetInplace(const ContextPtr & context)
{
auto set = external_table_set->buildOrderedSetInplace(context);
if (set)
return set_and_key->set = set;
{
set_and_key->set = set;
return set_and_key->set;
}
}
auto plan = build(context);

View File

@ -47,7 +47,7 @@ public:
/// Returns set if set is ready (created and filled) or nullptr if not.
virtual SetPtr get() const = 0;
/// Returns set->getElementsTypes(), even if set is not created yet.
virtual const DataTypes & getTypes() const = 0;
virtual DataTypes getTypes() const = 0;
/// If possible, return set with stored elements useful for PK analysis.
virtual SetPtr buildOrderedSetInplace(const ContextPtr & context) = 0;
};
@ -62,7 +62,7 @@ public:
FutureSetFromStorage(SetPtr set_);
SetPtr get() const override;
const DataTypes & getTypes() const override;
DataTypes getTypes() const override;
SetPtr buildOrderedSetInplace(const ContextPtr &) override;
private:
@ -79,7 +79,7 @@ public:
SetPtr get() const override { return set; }
SetPtr buildOrderedSetInplace(const ContextPtr & context) override;
const DataTypes & getTypes() const override;
DataTypes getTypes() const override;
private:
SetPtr set;
@ -105,7 +105,7 @@ public:
const Settings & settings);
SetPtr get() const override;
const DataTypes & getTypes() const override;
DataTypes getTypes() const override;
SetPtr buildOrderedSetInplace(const ContextPtr & context) override;
std::unique_ptr<QueryPlan> build(const ContextPtr & context);

View File

@ -94,11 +94,12 @@ namespace DB
namespace ErrorCodes
{
extern const int CANNOT_USE_QUERY_CACHE_WITH_NONDETERMINISTIC_FUNCTIONS;
extern const int INTO_OUTFILE_NOT_ALLOWED;
extern const int QUERY_WAS_CANCELLED;
extern const int INVALID_TRANSACTION;
extern const int LOGICAL_ERROR;
extern const int NOT_IMPLEMENTED;
extern const int QUERY_WAS_CANCELLED;
}
@ -991,7 +992,7 @@ static std::tuple<ASTPtr, BlockIO> executeQueryImpl(
if (!async_insert)
{
/// If it is a non-internal SELECT, and passive/read use of the query cache is enabled, and the cache knows the query, then set
/// If it is a non-internal SELECT, and passive (read) use of the query cache is enabled, and the cache knows the query, then set
/// a pipeline with a source populated by the query cache.
auto get_result_from_query_cache = [&]()
{
@ -1091,11 +1092,14 @@ static std::tuple<ASTPtr, BlockIO> executeQueryImpl(
res = interpreter->execute();
/// If it is a non-internal SELECT query, and active/write use of the query cache is enabled, then add a processor on
/// If it is a non-internal SELECT query, and active (write) use of the query cache is enabled, then add a processor on
/// top of the pipeline which stores the result in the query cache.
if (can_use_query_cache && settings.enable_writes_to_query_cache
&& (!astContainsNonDeterministicFunctions(ast, context) || settings.query_cache_store_results_of_queries_with_nondeterministic_functions))
if (can_use_query_cache && settings.enable_writes_to_query_cache)
{
if (astContainsNonDeterministicFunctions(ast, context) && !settings.query_cache_store_results_of_queries_with_nondeterministic_functions)
throw Exception(ErrorCodes::CANNOT_USE_QUERY_CACHE_WITH_NONDETERMINISTIC_FUNCTIONS,
"Unable to cache the query result because the query contains a non-deterministic function. Use setting query_cache_store_results_of_queries_with_nondeterministic_functions = 1 to store the query result regardless.");
QueryCache::Key key(
ast, res.pipeline.getHeader(),
context->getUserName(), settings.query_cache_share_between_users,

View File

@ -20,6 +20,7 @@
#include <Columns/ColumnArray.h>
#include <DataTypes/DataTypeArray.h>
#include <Storages/StorageInMemoryMetadata.h>
#include <Storages/BlockNumberColumn.h>
namespace DB
@ -260,7 +261,7 @@ void fillMissingColumns(
const NamesAndTypesList & requested_columns,
const NamesAndTypesList & available_columns,
const NameSet & partially_read_columns,
StorageMetadataPtr metadata_snapshot)
StorageMetadataPtr metadata_snapshot, size_t block_number)
{
size_t num_columns = requested_columns.size();
if (num_columns != res_columns.size())
@ -339,9 +340,14 @@ void fillMissingColumns(
}
else
{
/// We must turn a constant column into a full column because the interpreter could infer
/// that it is constant everywhere but in some blocks (from other parts) it can be a full column.
res_columns[i] = type->createColumnConstWithDefaultValue(num_rows)->convertToFullColumnIfConst();
if (requested_column->name == BlockNumberColumn::name)
res_columns[i] = type->createColumnConst(num_rows, block_number)->convertToFullColumnIfConst();
else
/// We must turn a constant column into a full column because the interpreter could infer
/// that it is constant everywhere but in some blocks (from other parts) it can be a full column.
res_columns[i] = type->createColumnConstWithDefaultValue(num_rows)->convertToFullColumnIfConst();
}
}
}

View File

@ -46,6 +46,6 @@ void fillMissingColumns(
const NamesAndTypesList & requested_columns,
const NamesAndTypesList & available_columns,
const NameSet & partially_read_columns,
StorageMetadataPtr metadata_snapshot);
StorageMetadataPtr metadata_snapshot, size_t block_number = 0);
}

View File

@ -130,7 +130,7 @@ static std::shared_ptr<parquet::FileMetaData> getFileMetadata(
const FormatSettings & format_settings,
std::atomic<int> & is_stopped)
{
auto arrow_file = asArrowFile(in, format_settings, is_stopped, "Parquet", PARQUET_MAGIC_BYTES);
auto arrow_file = asArrowFile(in, format_settings, is_stopped, "Parquet", PARQUET_MAGIC_BYTES, /* avoid_buffering */ true);
return parquet::ReadMetaData(arrow_file);
}
@ -495,12 +495,15 @@ NamesAndTypesList ParquetMetadataSchemaReader::readSchema()
void registerInputFormatParquetMetadata(FormatFactory & factory)
{
factory.registerInputFormat(
factory.registerRandomAccessInputFormat(
"ParquetMetadata",
[](ReadBuffer &buf,
const Block &sample,
const RowInputFormatParams &,
const FormatSettings & settings)
[](ReadBuffer & buf,
const Block & sample,
const FormatSettings & settings,
const ReadSettings &,
bool /* is_remote_fs */,
size_t /* max_download_threads */,
size_t /* max_parsing_threads */)
{
return std::make_shared<ParquetMetadataInputFormat>(buf, sample, settings);
});

View File

@ -28,7 +28,6 @@ ReplacingSortedAlgorithm::ReplacingSortedAlgorithm(
, cleanup(cleanup_)
, cleanedup_rows_count(cleanedup_rows_count_)
{
if (!is_deleted_column.empty())
is_deleted_column_number = header_.getPositionByName(is_deleted_column);
if (!version_column.empty())
@ -83,8 +82,11 @@ IMergingAlgorithm::Status ReplacingSortedAlgorithm::merge()
uint8_t value = assert_cast<const ColumnUInt8 &>(*(*selected_row.all_columns)[is_deleted_column_number]).getData()[selected_row.row_num];
if (!cleanup || !value)
insertRow();
else if (cleanedup_rows_count != nullptr)
else if (cleanup && cleanedup_rows_count != nullptr)
{
*cleanedup_rows_count += current_row_sources.size();
current_row_sources.resize(0);
}
}
else
insertRow();
@ -141,8 +143,11 @@ IMergingAlgorithm::Status ReplacingSortedAlgorithm::merge()
uint8_t value = assert_cast<const ColumnUInt8 &>(*(*selected_row.all_columns)[is_deleted_column_number]).getData()[selected_row.row_num];
if (!cleanup || !value)
insertRow();
else if (cleanedup_rows_count != nullptr)
else if (cleanup && cleanedup_rows_count != nullptr)
{
*cleanedup_rows_count += current_row_sources.size();
current_row_sources.resize(0);
}
}
else
insertRow();

View File

@ -12,6 +12,7 @@
#include <DataTypes/NestedUtils.h>
#include <DataTypes/DataTypeLowCardinality.h>
#include <IO/WriteHelpers.h>
#include <Storages/BlockNumberColumn.h>
namespace DB
@ -222,6 +223,12 @@ static SummingSortedAlgorithm::ColumnsDefinition defineColumns(
const ColumnWithTypeAndName & column = header.safeGetByPosition(i);
const auto * simple = dynamic_cast<const DataTypeCustomSimpleAggregateFunction *>(column.type->getCustomName());
if (column.name == BlockNumberColumn::name)
{
def.column_numbers_not_to_aggregate.push_back(i);
continue;
}
/// Discover nested Maps and find columns for summation
if (typeid_cast<const DataTypeArray *>(column.type.get()) && !simple)
{

View File

@ -49,7 +49,8 @@ TTLTransform::TTLTransform(
for (const auto & group_by_ttl : metadata_snapshot_->getGroupByTTLs())
algorithms.emplace_back(std::make_unique<TTLAggregationAlgorithm>(
group_by_ttl, old_ttl_infos.group_by_ttl[group_by_ttl.result_column], current_time_, force_, getInputPort().getHeader(), storage_));
group_by_ttl, old_ttl_infos.group_by_ttl[group_by_ttl.result_column], current_time_, force_,
getInputPort().getHeader(), storage_));
if (metadata_snapshot_->hasAnyColumnTTL())
{

View File

@ -27,6 +27,7 @@
#include <Storages/AlterCommands.h>
#include <Storages/IStorage.h>
#include <Storages/LightweightDeleteDescription.h>
#include <Storages/BlockNumberColumn.h>
#include <Storages/MergeTree/MergeTreeData.h>
#include <Common/typeid_cast.h>
#include <Common/randomSeed.h>
@ -782,7 +783,7 @@ bool AlterCommand::isRequireMutationStage(const StorageInMemoryMetadata & metada
/// Drop alias is metadata alter, in other case mutation is required.
if (type == DROP_COLUMN)
return metadata.columns.hasColumnOrNested(GetColumnsOptions::AllPhysical, column_name) ||
column_name == LightweightDeleteDescription::FILTER_COLUMN.name;
column_name == LightweightDeleteDescription::FILTER_COLUMN.name || column_name == BlockNumberColumn::name;
if (type != MODIFY_COLUMN || data_type == nullptr)
return false;
@ -1066,6 +1067,10 @@ void AlterCommands::validate(const StoragePtr & table, ContextPtr context) const
throw Exception(ErrorCodes::ILLEGAL_COLUMN, "Cannot add column {}: "
"this column name is reserved for lightweight delete feature", backQuote(column_name));
if (column_name == BlockNumberColumn::name && std::dynamic_pointer_cast<MergeTreeData>(table))
throw Exception(ErrorCodes::ILLEGAL_COLUMN, "Cannot add column {}: "
"this column name is reserved for _block_number persisting feature", backQuote(column_name));
if (command.codec)
CompressionCodecFactory::instance().validateCodecAndGetPreprocessedAST(command.codec, command.data_type, !context->getSettingsRef().allow_suspicious_codecs, context->getSettingsRef().allow_experimental_codecs, context->getSettingsRef().enable_deflate_qpl_codec);
@ -1270,6 +1275,10 @@ void AlterCommands::validate(const StoragePtr & table, ContextPtr context) const
throw Exception(ErrorCodes::ILLEGAL_COLUMN, "Cannot rename to {}: "
"this column name is reserved for lightweight delete feature", backQuote(command.rename_to));
if (command.rename_to == BlockNumberColumn::name && std::dynamic_pointer_cast<MergeTreeData>(table))
throw Exception(ErrorCodes::ILLEGAL_COLUMN, "Cannot rename to {}: "
"this column name is reserved for _block_number persisting feature", backQuote(command.rename_to));
if (modified_columns.contains(column_name))
throw Exception(ErrorCodes::NOT_IMPLEMENTED, "Cannot rename and modify the same column {} "
"in a single ALTER query", backQuote(column_name));

View File

@ -0,0 +1,23 @@
#include <Storages/BlockNumberColumn.h>
#include <Compression/CompressionCodecMultiple.h>
namespace DB
{
CompressionCodecPtr getCompressionCodecDelta(UInt8 delta_bytes_size);
CompressionCodecPtr getCompressionCodecForBlockNumberColumn()
{
std::vector <CompressionCodecPtr> codecs;
codecs.reserve(2);
auto data_bytes_size = BlockNumberColumn::type->getSizeOfValueInMemory();
codecs.emplace_back(getCompressionCodecDelta(data_bytes_size));
codecs.emplace_back(CompressionCodecFactory::instance().get("LZ4", {}));
return std::make_shared<CompressionCodecMultiple>(codecs);
}
const String BlockNumberColumn::name = "_block_number";
const DataTypePtr BlockNumberColumn::type = std::make_shared<DataTypeUInt64>();
const CompressionCodecPtr BlockNumberColumn::compression_codec = getCompressionCodecForBlockNumberColumn();
}

View File

@ -0,0 +1,16 @@
#pragma once
#include <Core/NamesAndTypes.h>
#include <DataTypes/DataTypesNumber.h>
#include <Compression/CompressionFactory.h>
namespace DB
{
struct BlockNumberColumn
{
static const String name;
static const DataTypePtr type;
static const CompressionCodecPtr compression_codec;
};
}

View File

@ -30,11 +30,15 @@
#include <Interpreters/TreeRewriter.h>
#include <Interpreters/ExpressionActions.h>
#include <Interpreters/FunctionNameNormalizer.h>
#include <Storages/BlockNumberColumn.h>
namespace DB
{
CompressionCodecPtr getCompressionCodecDelta(UInt8 delta_bytes_size);
namespace ErrorCodes
{
extern const int NO_SUCH_COLUMN_IN_TABLE;
@ -721,11 +725,13 @@ CompressionCodecPtr ColumnsDescription::getCodecOrDefault(const String & column_
CompressionCodecPtr ColumnsDescription::getCodecOrDefault(const String & column_name) const
{
assert (column_name != BlockNumberColumn::name);
return getCodecOrDefault(column_name, CompressionCodecFactory::instance().getDefaultCodec());
}
ASTPtr ColumnsDescription::getCodecDescOrDefault(const String & column_name, CompressionCodecPtr default_codec) const
{
assert (column_name != BlockNumberColumn::name);
const auto it = columns.get<1>().find(column_name);
if (it == columns.get<1>().end() || !it->codec)

View File

@ -477,10 +477,6 @@ public:
/// Moar hardening: this method is supposed to be used for debug assertions
bool assertHasValidVersionMetadata() const;
/// Return hardlink count for part.
/// Required for keep data on remote FS when part has shadow copies.
UInt32 getNumberOfRefereneces() const;
/// True if the part supports lightweight delete mutate.
bool supportLightweightDeleteMutate() const;

View File

@ -62,7 +62,7 @@ const IMergeTreeReader::ValueSizeMap & IMergeTreeReader::getAvgValueSizeHints()
return avg_value_size_hints;
}
void IMergeTreeReader::fillMissingColumns(Columns & res_columns, bool & should_evaluate_missing_defaults, size_t num_rows) const
void IMergeTreeReader::fillMissingColumns(Columns & res_columns, bool & should_evaluate_missing_defaults, size_t num_rows, size_t block_number) const
{
try
{
@ -71,7 +71,7 @@ void IMergeTreeReader::fillMissingColumns(Columns & res_columns, bool & should_e
res_columns, num_rows,
Nested::convertToSubcolumns(requested_columns),
Nested::convertToSubcolumns(available_columns),
partially_read_columns, storage_snapshot->metadata);
partially_read_columns, storage_snapshot->metadata, block_number);
should_evaluate_missing_defaults = std::any_of(
res_columns.begin(), res_columns.end(), [](const auto & column) { return column == nullptr; });

View File

@ -45,7 +45,7 @@ public:
/// Add columns from ordered_names that are not present in the block.
/// Missing columns are added in the order specified by ordered_names.
/// num_rows is needed in case if all res_columns are nullptr.
void fillMissingColumns(Columns & res_columns, bool & should_evaluate_missing_defaults, size_t num_rows) const;
void fillMissingColumns(Columns & res_columns, bool & should_evaluate_missing_defaults, size_t num_rows, size_t block_number = 0) const;
/// Evaluate defaulted columns if necessary.
void evaluateMissingDefaults(Block additional_columns, Columns & res_columns) const;

View File

@ -3,6 +3,7 @@
#include <DataTypes/DataTypesNumber.h>
#include <DataTypes/FieldToDataType.h>
#include <DataTypes/getLeastSupertype.h>
#include <DataTypes/Utils.h>
#include <Interpreters/TreeRewriter.h>
#include <Interpreters/ExpressionAnalyzer.h>
#include <Interpreters/ExpressionActions.h>
@ -1258,10 +1259,18 @@ bool KeyCondition::tryPrepareSetIndex(
const auto right_arg = func.getArgumentAt(1);
auto future_set = right_arg.tryGetPreparedSet(indexes_mapping, data_types);
auto future_set = right_arg.tryGetPreparedSet();
if (!future_set)
return false;
const auto set_types = future_set->getTypes();
size_t set_types_size = set_types.size();
size_t indexes_mapping_size = indexes_mapping.size();
for (auto & index_mapping : indexes_mapping)
if (index_mapping.tuple_index >= set_types_size)
return false;
auto prepared_set = future_set->buildOrderedSetInplace(right_arg.getTreeContext().getQueryContext());
if (!prepared_set)
return false;
@ -1270,11 +1279,72 @@ bool KeyCondition::tryPrepareSetIndex(
if (!prepared_set->hasExplicitSetElements())
return false;
prepared_set->checkColumnsNumber(left_args_count);
for (size_t i = 0; i < indexes_mapping.size(); ++i)
prepared_set->checkTypesEqual(indexes_mapping[i].tuple_index, data_types[i]);
/** Try to convert set columns to primary key columns.
* Example: SELECT id FROM test_table WHERE id IN (SELECT 1);
* In this example table `id` column has type UInt64, Set column has type UInt8. To use index
* we need to convert set column to primary key column.
*/
auto set_columns = prepared_set->getSetElements();
assert(set_types_size == set_columns.size());
out.set_index = std::make_shared<MergeTreeSetIndex>(prepared_set->getSetElements(), std::move(indexes_mapping));
for (size_t indexes_mapping_index = 0; indexes_mapping_index < indexes_mapping_size; ++indexes_mapping_index)
{
const auto & key_column_type = data_types[indexes_mapping_index];
size_t set_element_index = indexes_mapping[indexes_mapping_index].tuple_index;
auto set_element_type = set_types[set_element_index];
auto set_column = set_columns[set_element_index];
if (canBeSafelyCasted(set_element_type, key_column_type))
{
set_columns[set_element_index] = castColumn({set_column, set_element_type, {}}, key_column_type);
continue;
}
if (!key_column_type->canBeInsideNullable())
return false;
const NullMap * set_column_null_map = nullptr;
if (isNullableOrLowCardinalityNullable(set_element_type))
{
if (WhichDataType(set_element_type).isLowCardinality())
{
set_element_type = removeLowCardinality(set_element_type);
set_column = set_column->convertToFullColumnIfLowCardinality();
}
set_element_type = removeNullable(set_element_type);
const auto & set_column_nullable = assert_cast<const ColumnNullable &>(*set_column);
set_column_null_map = &set_column_nullable.getNullMapData();
set_column = set_column_nullable.getNestedColumnPtr();
}
auto nullable_set_column = castColumnAccurateOrNull({set_column, set_element_type, {}}, key_column_type);
const auto & nullable_set_column_typed = assert_cast<const ColumnNullable &>(*nullable_set_column);
const auto & nullable_set_column_null_map = nullable_set_column_typed.getNullMapData();
size_t nullable_set_column_null_map_size = nullable_set_column_null_map.size();
IColumn::Filter filter(nullable_set_column_null_map_size);
if (set_column_null_map)
{
for (size_t i = 0; i < nullable_set_column_null_map_size; ++i)
filter[i] = (*set_column_null_map)[i] || !nullable_set_column_null_map[i];
set_column = nullable_set_column_typed.filter(filter, 0);
}
else
{
for (size_t i = 0; i < nullable_set_column_null_map_size; ++i)
filter[i] = !nullable_set_column_null_map[i];
set_column = nullable_set_column_typed.getNestedColumn().filter(filter, 0);
}
set_columns[set_element_index] = std::move(set_column);
}
out.set_index = std::make_shared<MergeTreeSetIndex>(set_columns, std::move(indexes_mapping));
return true;
}

View File

@ -218,6 +218,14 @@ bool MergeTask::ExecuteAndFinalizeHorizontalPart::prepare()
ctx->need_remove_expired_values = false;
ctx->force_ttl = false;
if (supportsBlockNumberColumn(global_ctx) && !global_ctx->storage_columns.contains(BlockNumberColumn::name))
{
global_ctx->storage_columns.emplace_back(NameAndTypePair{BlockNumberColumn::name,BlockNumberColumn::type});
global_ctx->all_column_names.emplace_back(BlockNumberColumn::name);
global_ctx->gathering_columns.emplace_back(NameAndTypePair{BlockNumberColumn::name,BlockNumberColumn::type});
global_ctx->gathering_column_names.emplace_back(BlockNumberColumn::name);
}
SerializationInfo::Settings info_settings =
{
.ratio_of_defaults_for_sparse = global_ctx->data->getSettings()->ratio_of_defaults_for_sparse_serialization,
@ -251,12 +259,12 @@ bool MergeTask::ExecuteAndFinalizeHorizontalPart::prepare()
}
}
global_ctx->new_data_part->setColumns(global_ctx->storage_columns, infos, global_ctx->metadata_snapshot->getMetadataVersion());
const auto & local_part_min_ttl = global_ctx->new_data_part->ttl_infos.part_min_ttl;
if (local_part_min_ttl && local_part_min_ttl <= global_ctx->time_of_merge)
ctx->need_remove_expired_values = true;
global_ctx->new_data_part->setColumns(global_ctx->storage_columns, infos, global_ctx->metadata_snapshot->getMetadataVersion());
if (ctx->need_remove_expired_values && global_ctx->ttl_merges_blocker->isCancelled())
{
LOG_INFO(ctx->log, "Part {} has values with expired TTL, but merges with TTL are cancelled.", global_ctx->new_data_part->name);
@ -998,6 +1006,17 @@ void MergeTask::ExecuteAndFinalizeHorizontalPart::createMergedStream()
if (global_ctx->deduplicate)
{
/// We don't want to deduplicate by block number column
/// so if deduplicate_by_columns is empty, add all columns except _block_number
if (supportsBlockNumberColumn(global_ctx) && global_ctx->deduplicate_by_columns.empty())
{
for (const auto & col : global_ctx->merging_column_names)
{
if (col != BlockNumberColumn::name)
global_ctx->deduplicate_by_columns.emplace_back(col);
}
}
if (DistinctSortedTransform::isApplicable(header, sort_description, global_ctx->deduplicate_by_columns))
res_pipe.addTransform(std::make_shared<DistinctSortedTransform>(
res_pipe.getHeader(), sort_description, SizeLimits(), 0 /*limit_hint*/, global_ctx->deduplicate_by_columns));

View File

@ -13,6 +13,7 @@
#include <QueryPipeline/QueryPipeline.h>
#include <Compression/CompressedReadBufferFromFile.h>
#include <Common/filesystemHelpers.h>
#include <Storages/BlockNumberColumn.h>
#include <memory>
#include <list>
@ -388,6 +389,12 @@ private:
Stages::iterator stages_iterator = stages.begin();
/// Check for persisting block number column
static bool supportsBlockNumberColumn(GlobalRuntimeContextPtr global_ctx)
{
return global_ctx->data->getSettings()->allow_experimental_block_number_column && global_ctx->metadata_snapshot->getGroupByTTLs().empty();
}
};
/// FIXME

View File

@ -78,6 +78,7 @@
#include <Storages/VirtualColumnUtils.h>
#include <Storages/MergeTree/MergeTreeDataPartBuilder.h>
#include <Storages/MutationCommands.h>
#include <Storages/BlockNumberColumn.h>
#include <boost/range/algorithm_ext/erase.hpp>
#include <boost/algorithm/string/join.hpp>
@ -3730,7 +3731,7 @@ void MergeTreeData::checkPartDynamicColumns(MutableDataPartPtr & part, DataParts
const auto & part_columns = part->getColumns();
for (const auto & part_column : part_columns)
{
if (part_column.name == LightweightDeleteDescription::FILTER_COLUMN.name)
if (part_column.name == LightweightDeleteDescription::FILTER_COLUMN.name || part_column.name == BlockNumberColumn::name)
continue;
auto storage_column = columns.getPhysical(part_column.name);
@ -8269,6 +8270,7 @@ NamesAndTypesList MergeTreeData::getVirtuals() const
NameAndTypePair("_sample_factor", std::make_shared<DataTypeFloat64>()),
NameAndTypePair("_part_offset", std::make_shared<DataTypeUInt64>()),
LightweightDeleteDescription::FILTER_COLUMN,
NameAndTypePair(BlockNumberColumn::name, BlockNumberColumn::type),
};
}

View File

@ -5,6 +5,7 @@
#include <Interpreters/Context.h>
#include <Storages/MergeTree/LoadedMergeTreeDataPartInfoForReader.h>
#include <Compression/CompressedReadBufferFromFile.h>
#include <Storages/BlockNumberColumn.h>
namespace DB
@ -64,6 +65,12 @@ IMergeTreeDataPart::MergeTreeWriterPtr MergeTreeDataPartCompact::getWriter(
ordered_columns_list.sort([this](const auto & lhs, const auto & rhs)
{ return *getColumnPosition(lhs.name) < *getColumnPosition(rhs.name); });
/// _block_number column is not added by user, but is persisted in a part after merge
/// If _block_number is not present in the parts to be merged, then it won't have a position
/// So check if its not present and add it at the end
if (columns_list.contains(BlockNumberColumn::name) && !ordered_columns_list.contains(BlockNumberColumn::name))
ordered_columns_list.emplace_back(NameAndTypePair{BlockNumberColumn::name, BlockNumberColumn::type});
return std::make_unique<MergeTreeDataPartWriterCompact>(
shared_from_this(), ordered_columns_list, metadata_snapshot,
indices_to_recalc, getMarksFileExtension(),

View File

@ -1,9 +1,12 @@
#include <Storages/MergeTree/MergeTreeDataPartWriterCompact.h>
#include <Storages/MergeTree/MergeTreeDataPartCompact.h>
#include <Storages/BlockNumberColumn.h>
namespace DB
{
CompressionCodecPtr getCompressionCodecDelta(UInt8 delta_bytes_size);
namespace ErrorCodes
{
extern const int LOGICAL_ERROR;
@ -53,7 +56,14 @@ MergeTreeDataPartWriterCompact::MergeTreeDataPartWriterCompact(
const auto & storage_columns = metadata_snapshot->getColumns();
for (const auto & column : columns_list)
addStreams(column, storage_columns.getCodecDescOrDefault(column.name, default_codec));
{
ASTPtr compression;
if (column.name == BlockNumberColumn::name)
compression = BlockNumberColumn::compression_codec->getFullCodecDesc();
else
compression = storage_columns.getCodecDescOrDefault(column.name, default_codec);
addStreams(column, compression);
}
}
void MergeTreeDataPartWriterCompact::addStreams(const NameAndTypePair & column, const ASTPtr & effective_codec_desc)

View File

@ -6,9 +6,12 @@
#include <Common/escapeForFileName.h>
#include <Columns/ColumnSparse.h>
#include <Common/logger_useful.h>
#include <Storages/BlockNumberColumn.h>
namespace DB
{
CompressionCodecPtr getCompressionCodecDelta(UInt8 delta_bytes_size);
namespace ErrorCodes
{
extern const int LOGICAL_ERROR;
@ -87,7 +90,14 @@ MergeTreeDataPartWriterWide::MergeTreeDataPartWriterWide(
{
const auto & columns = metadata_snapshot->getColumns();
for (const auto & it : columns_list)
addStreams(it, columns.getCodecDescOrDefault(it.name, default_codec));
{
ASTPtr compression;
if (it.name == BlockNumberColumn::name)
compression = BlockNumberColumn::compression_codec->getFullCodecDesc();
else
compression = columns.getCodecDescOrDefault(it.name, default_codec);
addStreams(it, compression);
}
}
void MergeTreeDataPartWriterWide::addStreams(

View File

@ -46,7 +46,7 @@
#include <Functions/IFunction.h>
#include <IO/WriteBufferFromOStream.h>
#include <Storages/BlockNumberColumn.h>
#include <Storages/MergeTree/ApproximateNearestNeighborIndexesCommon.h>
namespace CurrentMetrics
@ -1232,6 +1232,10 @@ static void selectColumnNames(
{
virt_column_names.push_back(name);
}
else if (name == BlockNumberColumn::name)
{
virt_column_names.push_back(name);
}
else if (name == "_part_uuid")
{
virt_column_names.push_back(name);

View File

@ -154,36 +154,45 @@ void MergeTreeIndexAggregatorAnnoy<Distance>::update(const Block & block, size_t
if (const auto & column_array = typeid_cast<const ColumnArray *>(column_cut.get()))
{
const auto & data = column_array->getData();
const auto & array = typeid_cast<const ColumnFloat32 &>(data).getData();
const auto & column_array_data = column_array->getData();
const auto & column_arary_data_float_data = typeid_cast<const ColumnFloat32 &>(column_array_data).getData();
if (array.empty())
throw Exception(ErrorCodes::LOGICAL_ERROR, "Array has 0 rows, {} rows expected", rows_read);
const auto & column_array_offsets = column_array->getOffsets();
const size_t num_rows = column_array_offsets.size();
const auto & offsets = column_array->getOffsets();
const size_t num_rows = offsets.size();
/// The Annoy algorithm naturally assumes that the indexed vectors have dimension >= 1. This condition is violated if empty arrays
/// are INSERTed into an Annoy-indexed column or if no value was specified at all in which case the arrays take on their default
/// value which is also empty.
if (column_array->isDefaultAt(0))
throw Exception(ErrorCodes::INCORRECT_DATA, "The arrays in column '{}' must not be empty. Did you try to INSERT default values?", index_column_name);
/// Check all sizes are the same
size_t size = offsets[0];
size_t dimension = column_array_offsets[0];
for (size_t i = 0; i < num_rows - 1; ++i)
if (offsets[i + 1] - offsets[i] != size)
throw Exception(ErrorCodes::INCORRECT_DATA, "All arrays in column {} must have equal length", index_column_name);
if (column_array_offsets[i + 1] - column_array_offsets[i] != dimension)
throw Exception(ErrorCodes::INCORRECT_DATA, "All arrays in column '{}' must have equal length", index_column_name);
/// Also check that previously inserted blocks have the same size as this block.
/// Note that this guarantees consistency of dimension only within parts. We are unable to detect inconsistent dimensions across
/// parts - for this, a little help from the user is needed, e.g. CONSTRAINT cnstr CHECK length(array) = 42.
if (index && index->getDimensions() != dimension)
throw Exception(ErrorCodes::INCORRECT_DATA, "All arrays in column '{}' must have equal length", index_column_name);
if (!index)
index = std::make_shared<AnnoyIndexWithSerialization<Distance>>(size);
index = std::make_shared<AnnoyIndexWithSerialization<Distance>>(dimension);
/// Add all rows of block
index->add_item(index->get_n_items(), array.data());
index->add_item(index->get_n_items(), column_arary_data_float_data.data());
for (size_t current_row = 1; current_row < num_rows; ++current_row)
index->add_item(index->get_n_items(), &array[offsets[current_row - 1]]);
index->add_item(index->get_n_items(), &column_arary_data_float_data[column_array_offsets[current_row - 1]]);
}
else if (const auto & column_tuple = typeid_cast<const ColumnTuple *>(column_cut.get()))
{
const auto & columns = column_tuple->getColumns();
const auto & column_tuple_columns = column_tuple->getColumns();
/// TODO check if calling index->add_item() directly on the block's tuples is faster than materializing everything
std::vector<std::vector<Float32>> data{column_tuple->size(), std::vector<Float32>()};
for (const auto & column : columns)
std::vector<std::vector<Float32>> data(column_tuple->size(), std::vector<Float32>());
for (const auto & column : column_tuple_columns)
{
const auto & pod_array = typeid_cast<const ColumnFloat32 *>(column.get())->getData();
for (size_t i = 0; i < pod_array.size(); ++i)
@ -363,7 +372,7 @@ void annoyIndexValidator(const IndexDescription & index, bool /* attach */)
{
throw Exception(
ErrorCodes::ILLEGAL_COLUMN,
"Annoy indexes can only be created on columns of type Array(Float32) and Tuple(Float32)");
"Annoy indexes can only be created on columns of type Array(Float32) and Tuple(Float32[, Float32[, ...]])");
};
DataTypePtr data_type = index.sample_block.getDataTypes()[0];

View File

@ -173,23 +173,32 @@ void MergeTreeIndexAggregatorUSearch<Metric>::update(const Block & block, size_t
if (const auto & column_array = typeid_cast<const ColumnArray *>(column_cut.get()))
{
const auto & data = column_array->getData();
const auto & array = typeid_cast<const ColumnFloat32 &>(data).getData();
const auto & column_array_data = column_array->getData();
const auto & column_array_data_float_data = typeid_cast<const ColumnFloat32 &>(column_array_data).getData();
if (array.empty())
throw Exception(ErrorCodes::LOGICAL_ERROR, "Array has 0 rows, {} rows expected", rows_read);
const auto & column_array_offsets = column_array->getOffsets();
const size_t num_rows = column_array_offsets.size();
const auto & offsets = column_array->getOffsets();
const size_t num_rows = offsets.size();
/// The Usearch algorithm naturally assumes that the indexed vectors have dimension >= 1. This condition is violated if empty arrays
/// are INSERTed into an Usearch-indexed column or if no value was specified at all in which case the arrays take on their default
/// values which is also empty.
if (column_array->isDefaultAt(0))
throw Exception(ErrorCodes::INCORRECT_DATA, "The arrays in column '{}' must not be empty. Did you try to INSERT default values?", index_column_name);
/// Check all sizes are the same
size_t size = offsets[0];
size_t dimension = column_array_offsets[0];
for (size_t i = 0; i < num_rows - 1; ++i)
if (offsets[i + 1] - offsets[i] != size)
throw Exception(ErrorCodes::INCORRECT_DATA, "All arrays in column {} must have equal length", index_column_name);
if (column_array_offsets[i + 1] - column_array_offsets[i] != dimension)
throw Exception(ErrorCodes::INCORRECT_DATA, "All arrays in column '{}' must have equal length", index_column_name);
/// Also check that previously inserted blocks have the same size as this block.
/// Note that this guarantees consistency of dimension only within parts. We are unable to detect inconsistent dimensions across
/// parts - for this, a little help from the user is needed, e.g. CONSTRAINT cnstr CHECK length(array) = 42.
if (index && index->getDimensions() != dimension)
throw Exception(ErrorCodes::INCORRECT_DATA, "All arrays in column '{}' must have equal length", index_column_name);
if (!index)
index = std::make_shared<USearchIndexWithSerialization<Metric>>(size, scalar_kind);
index = std::make_shared<USearchIndexWithSerialization<Metric>>(dimension, scalar_kind);
/// Add all rows of block
if (!index->reserve(unum::usearch::ceil2(index->size() + num_rows)))
@ -197,7 +206,7 @@ void MergeTreeIndexAggregatorUSearch<Metric>::update(const Block & block, size_t
for (size_t current_row = 0; current_row < num_rows; ++current_row)
{
auto rc = index->add(static_cast<uint32_t>(index->size()), &array[offsets[current_row - 1]]);
auto rc = index->add(static_cast<uint32_t>(index->size()), &column_array_data_float_data[column_array_offsets[current_row - 1]]);
if (!rc)
throw Exception(ErrorCodes::INCORRECT_DATA, rc.error.release());
@ -208,9 +217,9 @@ void MergeTreeIndexAggregatorUSearch<Metric>::update(const Block & block, size_t
}
else if (const auto & column_tuple = typeid_cast<const ColumnTuple *>(column_cut.get()))
{
const auto & columns = column_tuple->getColumns();
std::vector<std::vector<Float32>> data{column_tuple->size(), std::vector<Float32>()};
for (const auto & column : columns)
const auto & column_tuple_columns = column_tuple->getColumns();
std::vector<std::vector<Float32>> data(column_tuple->size(), std::vector<Float32>());
for (const auto & column : column_tuple_columns)
{
const auto & pod_array = typeid_cast<const ColumnFloat32 *>(column.get())->getData();
for (size_t i = 0; i < pod_array.size(); ++i)
@ -413,7 +422,8 @@ void usearchIndexValidator(const IndexDescription & index, bool /* attach */)
auto throw_unsupported_underlying_column_exception = []()
{
throw Exception(
ErrorCodes::ILLEGAL_COLUMN, "USearch indexes can only be created on columns of type Array(Float32) and Tuple(Float32)");
ErrorCodes::ILLEGAL_COLUMN,
"USearch can only be created on columns of type Array(Float32) and Tuple(Float32[, Float32[, ...]])");
};
DataTypePtr data_type = index.sample_block.getDataTypes()[0];

View File

@ -9,6 +9,7 @@
#include <DataTypes/DataTypeUUID.h>
#include <DataTypes/DataTypeArray.h>
#include <Processors/Transforms/AggregatingTransform.h>
#include <Storages/BlockNumberColumn.h>
#include <city.h>
namespace DB
@ -24,7 +25,8 @@ namespace ErrorCodes
static void injectNonConstVirtualColumns(
size_t rows,
Block & block,
const Names & virtual_columns);
const Names & virtual_columns,
MergeTreeReadTask * task = nullptr);
static void injectPartConstVirtualColumns(
size_t rows,
@ -247,7 +249,8 @@ namespace
static void injectNonConstVirtualColumns(
size_t rows,
Block & block,
const Names & virtual_columns)
const Names & virtual_columns,
MergeTreeReadTask * task)
{
VirtualColumnsInserter inserter(block);
for (const auto & virtual_column_name : virtual_columns)
@ -278,6 +281,24 @@ static void injectNonConstVirtualColumns(
inserter.insertUInt8Column(column, virtual_column_name);
}
if (virtual_column_name == BlockNumberColumn::name)
{
ColumnPtr column;
if (rows)
{
size_t value = 0;
if (task)
{
value = task->getInfo().data_part ? task->getInfo().data_part->info.min_block : 0;
}
column = BlockNumberColumn::type->createColumnConst(rows, value)->convertToFullColumnIfConst();
}
else
column = BlockNumberColumn::type->createColumn();
inserter.insertUInt64Column(column, virtual_column_name);
}
}
}
@ -368,7 +389,7 @@ void MergeTreeSelectProcessor::injectVirtualColumns(
{
/// First add non-const columns that are filled by the range reader and then const columns that we will fill ourselves.
/// Note that the order is important: virtual columns filled by the range reader must go first
injectNonConstVirtualColumns(row_count, block, virtual_columns);
injectNonConstVirtualColumns(row_count, block, virtual_columns,task);
injectPartConstVirtualColumns(row_count, block, task, partition_value_type, virtual_columns);
}

View File

@ -176,7 +176,7 @@ try
current_mark += (rows_to_read == rows_read);
bool should_evaluate_missing_defaults = false;
reader->fillMissingColumns(columns, should_evaluate_missing_defaults, rows_read);
reader->fillMissingColumns(columns, should_evaluate_missing_defaults, rows_read, data_part->info.min_block);
if (should_evaluate_missing_defaults)
{

View File

@ -171,7 +171,8 @@ struct Settings;
M(UInt64, part_moves_between_shards_delay_seconds, 30, "Time to wait before/after moving parts between shards.", 0) \
M(Bool, allow_remote_fs_zero_copy_replication, false, "Don't use this setting in production, because it is not ready.", 0) \
M(String, remote_fs_zero_copy_zookeeper_path, "/clickhouse/zero_copy", "ZooKeeper path for zero-copy table-independent info.", 0) \
M(Bool, remote_fs_zero_copy_path_compatible_mode, false, "Run zero-copy in compatible mode during conversion process.", 0) \
M(Bool, remote_fs_zero_copy_path_compatible_mode, false, "Run zero-copy in compatible mode during conversion process.", 0) \
M(Bool, allow_experimental_block_number_column, false, "Enable persisting column _block_number for each row.", 0) \
\
/** Compress marks and primary key. */ \
M(Bool, compress_marks, true, "Marks support compression, reduce mark file size and speed up network transmission.", 0) \

View File

@ -351,64 +351,6 @@ FutureSetPtr RPNBuilderTreeNode::tryGetPreparedSet(const DataTypes & data_types)
return nullptr;
}
FutureSetPtr RPNBuilderTreeNode::tryGetPreparedSet(
const std::vector<MergeTreeSetIndex::KeyTuplePositionMapping> & indexes_mapping,
const DataTypes & data_types) const
{
const auto & prepared_sets = getTreeContext().getPreparedSets();
/// We have `PreparedSetKey::forLiteral` but it is useless here as we don't have enough information
/// about types in left argument of the IN operator. Instead, we manually iterate through all the sets
/// and find the one for the right arg based on the AST structure (getTreeHash), after that we check
/// that the types it was prepared with are compatible with the types of the primary key.
auto types_match = [&indexes_mapping, &data_types](const DataTypes & set_types)
{
assert(indexes_mapping.size() == data_types.size());
for (size_t i = 0; i < indexes_mapping.size(); ++i)
{
if (indexes_mapping[i].tuple_index >= set_types.size())
return false;
auto lhs = removeNullable(recursiveRemoveLowCardinality(data_types[i]));
auto rhs = removeNullable(recursiveRemoveLowCardinality(set_types[indexes_mapping[i].tuple_index]));
if (!lhs->equals(*rhs))
return false;
}
return true;
};
if (prepared_sets && ast_node)
{
if (ast_node->as<ASTSubquery>() || ast_node->as<ASTTableIdentifier>())
return prepared_sets->findSubquery(ast_node->getTreeHash());
auto tree_hash = ast_node->getTreeHash();
const auto & sets = prepared_sets->getSetsFromTuple();
auto it = sets.find(tree_hash);
if (it == sets.end())
return nullptr;
for (const auto & future_set : it->second)
if (types_match(future_set->getTypes()))
return future_set;
}
else
{
const auto * node_without_alias = getNodeWithoutAlias(dag_node);
if (node_without_alias->column)
{
auto future_set = tryGetSetFromDAGNode(node_without_alias);
if (types_match(future_set->getTypes()))
return future_set;
}
}
return nullptr;
}
RPNBuilderFunctionTreeNode RPNBuilderTreeNode::toFunctionNode() const
{
if (!isFunction())

View File

@ -116,11 +116,6 @@ public:
/// Try get prepared set from node that match data types
FutureSetPtr tryGetPreparedSet(const DataTypes & data_types) const;
/// Try get prepared set from node that match indexes mapping and data types
FutureSetPtr tryGetPreparedSet(
const std::vector<MergeTreeSetIndex::KeyTuplePositionMapping> & indexes_mapping,
const DataTypes & data_types) const;
/** Convert node to function node.
* Node must be function before calling these method, otherwise exception is thrown.
*/

View File

@ -150,9 +150,20 @@ static IMergeTreeDataPart::Checksums checkDataPart(
if (data_part_storage.exists(IMergeTreeDataPart::SERIALIZATION_FILE_NAME))
{
auto serialization_file = data_part_storage.readFile(IMergeTreeDataPart::SERIALIZATION_FILE_NAME, read_settings, std::nullopt, std::nullopt);
SerializationInfo::Settings settings{ratio_of_defaults, false};
serialization_infos = SerializationInfoByName::readJSON(columns_txt, settings, *serialization_file);
try
{
auto serialization_file = data_part_storage.readFile(IMergeTreeDataPart::SERIALIZATION_FILE_NAME, read_settings, std::nullopt, std::nullopt);
SerializationInfo::Settings settings{ratio_of_defaults, false};
serialization_infos = SerializationInfoByName::readJSON(columns_txt, settings, *serialization_file);
}
catch (const Poco::Exception & ex)
{
throw Exception(ErrorCodes::CORRUPTED_DATA, "Failed to load {}, with error {}", IMergeTreeDataPart::SERIALIZATION_FILE_NAME, ex.message());
}
catch (...)
{
throw;
}
}
auto get_serialization = [&serialization_infos](const auto & column)

View File

@ -102,6 +102,8 @@
#include <IO/Operators.h>
#include <IO/ConnectionTimeouts.h>
#include <Storages/BlockNumberColumn.h>
#include <memory>
#include <filesystem>
#include <optional>
@ -298,6 +300,7 @@ NamesAndTypesList StorageDistributed::getVirtuals() const
NameAndTypePair("_sample_factor", std::make_shared<DataTypeFloat64>()),
NameAndTypePair("_part_offset", std::make_shared<DataTypeUInt64>()),
NameAndTypePair("_row_exists", std::make_shared<DataTypeUInt8>()),
NameAndTypePair(BlockNumberColumn::name, BlockNumberColumn::type),
NameAndTypePair("_shard_num", std::make_shared<DataTypeUInt32>()), /// deprecated
};
}

View File

@ -33,6 +33,7 @@
#include <Backups/IBackup.h>
#include <Backups/RestorerFromBackup.h>
#include <Disks/TemporaryFileOnDisk.h>
#include <Storages/BlockNumberColumn.h>
#include <cassert>
#include <chrono>
@ -45,6 +46,8 @@
namespace DB
{
CompressionCodecPtr getCompressionCodecDelta(UInt8 delta_bytes_size);
namespace ErrorCodes
{
extern const int TIMEOUT_EXCEEDED;
@ -452,10 +455,15 @@ void LogSink::writeData(const NameAndTypePair & name_and_type, const IColumn & c
const auto & data_file = *data_file_it->second;
const auto & columns = metadata_snapshot->getColumns();
CompressionCodecPtr compression;
if (name_and_type.name == BlockNumberColumn::name)
compression = BlockNumberColumn::compression_codec;
else
compression = columns.getCodecOrDefault(name_and_type.name);
it = streams.try_emplace(data_file.name, storage.disk, data_file.path,
storage.file_checker.getFileSize(data_file.path),
columns.getCodecOrDefault(name_and_type.name),
storage.max_compress_block_size).first;
compression, storage.max_compress_block_size).first;
}
auto & stream = it->second;

View File

@ -2231,10 +2231,13 @@ CheckResults StorageMergeTree::checkData(const ASTPtr & query, ContextPtr local_
results.emplace_back(part->name, true, "Checksums recounted and written to disk.");
}
catch (const Exception & ex)
catch (...)
{
if (isRetryableException(std::current_exception()))
throw;
tryLogCurrentException(log, __PRETTY_FUNCTION__);
results.emplace_back(part->name, false, "Check of part finished with error: '" + ex.message() + "'");
results.emplace_back(part->name, false, "Check of part finished with error: '" + getCurrentExceptionMessage(false) + "'");
}
}
else
@ -2244,9 +2247,12 @@ CheckResults StorageMergeTree::checkData(const ASTPtr & query, ContextPtr local_
checkDataPart(part, true);
results.emplace_back(part->name, true, "");
}
catch (const Exception & ex)
catch (...)
{
results.emplace_back(part->name, false, ex.message());
if (isRetryableException(std::current_exception()))
throw;
results.emplace_back(part->name, false, getCurrentExceptionMessage(false));
}
}
}

View File

@ -1,5 +1,6 @@
#include <Storages/StorageSnapshot.h>
#include <Storages/LightweightDeleteDescription.h>
#include <Storages/BlockNumberColumn.h>
#include <Storages/IStorage.h>
#include <DataTypes/ObjectUtils.h>
#include <DataTypes/NestedUtils.h>
@ -24,6 +25,7 @@ void StorageSnapshot::init()
if (storage.hasLightweightDeletedMask())
system_columns[LightweightDeleteDescription::FILTER_COLUMN.name] = LightweightDeleteDescription::FILTER_COLUMN.type;
system_columns[BlockNumberColumn::name] = BlockNumberColumn::type;
}
NamesAndTypesList StorageSnapshot::getColumns(const GetColumnsOptions & options) const

View File

@ -141,16 +141,6 @@ STATUS_ICON_MAP = defaultdict(
)
def update_pr_status_label(pr: PullRequest, status: str) -> None:
new_label = "pr-status-" + STATUS_ICON_MAP[status]
for label in pr.get_labels():
if label.name == new_label:
return
if label.name.startswith("pr-status-"):
pr.remove_from_labels(label.name)
pr.add_to_labels(new_label)
def set_status_comment(commit: Commit, pr_info: PRInfo) -> None:
"""It adds or updates the comment status to all Pull Requests but for release
one, so the method does nothing for simple pushes and pull requests with
@ -190,8 +180,6 @@ def set_status_comment(commit: Commit, pr_info: PRInfo) -> None:
comment = ic
break
update_pr_status_label(pr, get_worst_state(statuses))
if comment is None:
pr.create_issue_comment(comment_body)
return

View File

@ -1096,6 +1096,7 @@ def test_stop_other_host_during_backup(kill):
if status == "BACKUP_CREATED":
node1.query("DROP TABLE tbl ON CLUSTER 'cluster' SYNC")
node1.query(f"RESTORE TABLE tbl ON CLUSTER 'cluster' FROM {backup_name}")
node1.query("SYSTEM SYNC REPLICA tbl")
assert node1.query("SELECT * FROM tbl ORDER BY x") == TSV([3, 5])
elif status == "BACKUP_FAILED":
assert not os.path.exists(

View File

@ -109,21 +109,15 @@ def test_check_normal_table_corruption(started_cluster):
corrupt_data_part_on_disk(node1, "non_replicated_mt", "201902_1_1_0")
assert (
node1.query(
"CHECK TABLE non_replicated_mt",
settings={"check_query_single_value_result": 0},
).strip()
== "201902_1_1_0\t0\tCannot read all data. Bytes read: 2. Bytes expected: 25."
)
assert node1.query(
"CHECK TABLE non_replicated_mt",
settings={"check_query_single_value_result": 0},
).strip().split("\t")[0:2] == ["201902_1_1_0", "0"]
assert (
node1.query(
"CHECK TABLE non_replicated_mt",
settings={"check_query_single_value_result": 0},
).strip()
== "201902_1_1_0\t0\tCannot read all data. Bytes read: 2. Bytes expected: 25."
)
assert node1.query(
"CHECK TABLE non_replicated_mt",
settings={"check_query_single_value_result": 0},
).strip().split("\t")[0:2] == ["201902_1_1_0", "0"]
node1.query(
"INSERT INTO non_replicated_mt VALUES (toDate('2019-01-01'), 1, 10), (toDate('2019-01-01'), 2, 12)"
@ -141,13 +135,10 @@ def test_check_normal_table_corruption(started_cluster):
remove_checksums_on_disk(node1, "non_replicated_mt", "201901_2_2_0")
assert (
node1.query(
"CHECK TABLE non_replicated_mt PARTITION 201901",
settings={"check_query_single_value_result": 0},
)
== "201901_2_2_0\t0\tCheck of part finished with error: \\'Cannot read all data. Bytes read: 2. Bytes expected: 25.\\'\n"
)
assert node1.query(
"CHECK TABLE non_replicated_mt PARTITION 201901",
settings={"check_query_single_value_result": 0},
).strip().split("\t")[0:2] == ["201901_2_2_0", "0"]
def test_check_replicated_table_simple(started_cluster):

View File

@ -2,3 +2,6 @@
2018-01-01 1 1
2018-01-01 2 2
2018-01-01 2 2
== (Replicas) Test optimize ==
d2 1 0
d4 1 0

View File

@ -1,10 +1,30 @@
set optimize_on_insert = 0;
drop table if exists tab_00577;
create table tab_00577 (date Date, version UInt64, val UInt64) engine = ReplacingMergeTree(version) partition by date order by date settings enable_vertical_merge_algorithm = 1, vertical_merge_algorithm_min_rows_to_activate = 1, vertical_merge_algorithm_min_columns_to_activate = 0;
create table tab_00577 (date Date, version UInt64, val UInt64) engine = ReplacingMergeTree(version) partition by date order by date settings enable_vertical_merge_algorithm = 1,
vertical_merge_algorithm_min_rows_to_activate = 0, vertical_merge_algorithm_min_columns_to_activate = 0, min_rows_for_wide_part = 0,
min_bytes_for_wide_part = 0;
insert into tab_00577 values ('2018-01-01', 2, 2), ('2018-01-01', 1, 1);
insert into tab_00577 values ('2018-01-01', 0, 0);
select * from tab_00577 order by version;
OPTIMIZE TABLE tab_00577;
OPTIMIZE TABLE tab_00577 FINAL CLEANUP;
select * from tab_00577;
drop table tab_00577;
DROP TABLE IF EXISTS testCleanupR1;
CREATE TABLE testCleanupR1 (uid String, version UInt32, is_deleted UInt8)
ENGINE = ReplicatedReplacingMergeTree('/clickhouse/{database}/tables/test_cleanup/', 'r1', version, is_deleted)
ORDER BY uid SETTINGS enable_vertical_merge_algorithm = 1, vertical_merge_algorithm_min_rows_to_activate = 0, vertical_merge_algorithm_min_columns_to_activate = 0, min_rows_for_wide_part = 0,
min_bytes_for_wide_part = 0;
INSERT INTO testCleanupR1 (*) VALUES ('d1', 1, 0),('d2', 1, 0),('d3', 1, 0),('d4', 1, 0);
INSERT INTO testCleanupR1 (*) VALUES ('d3', 2, 1);
INSERT INTO testCleanupR1 (*) VALUES ('d1', 2, 1);
SYSTEM SYNC REPLICA testCleanupR1; -- Avoid "Cannot select parts for optimization: Entry for part all_2_2_0 hasn't been read from the replication log yet"
OPTIMIZE TABLE testCleanupR1 FINAL CLEANUP;
-- Only d3 to d5 remain
SELECT '== (Replicas) Test optimize ==';
SELECT * FROM testCleanupR1 order by uid;
DROP TABLE IF EXISTS testCleanupR1

View File

@ -147,3 +147,4 @@ Expression (Projection)
9000 [9000,0,0,0]
1 (1,0,0,0)
9000 (9000,0,0,0)
--- Bugs ---

View File

@ -281,3 +281,15 @@ ORDER BY L2Distance(vector, (9000.0, 0.0, 0.0, 0.0))
LIMIT 1;
DROP TABLE tab;
SELECT '--- Bugs ---';
-- Arrays with default values are rejected, issue #52258
CREATE TABLE tab (`uuid` String, `vector` Array(Float32), `version` UInt32, INDEX idx vector TYPE annoy()) ENGINE = MergeTree() ORDER BY (uuid);
INSERT INTO tab (uuid, version) VALUES ('1', 3); -- { serverError INCORRECT_DATA }
DROP TABLE tab;
-- Tuples with default value work
CREATE TABLE tab (`uuid` String, `vector` Tuple(Float32, Float32), `version` UInt32, INDEX idx vector TYPE annoy()) ENGINE = MergeTree() ORDER BY (uuid);
INSERT INTO tab (uuid, version) VALUES ('1', 3); -- works fine
DROP TABLE tab;

View File

@ -150,3 +150,4 @@ Expression (Projection)
1 [0,0,10]
2 [0,0,10.5]
3 [0,0,9.5]
--- Bugs ---

View File

@ -274,3 +274,17 @@ SELECT *
FROM tab
WHERE L2Distance(vector, [0.0, 0.0, 10.0]) < 1.0
LIMIT 3;
DROP TABLE tab;
SELECT '--- Bugs ---';
-- Arrays with default values are rejected, issue #52258
CREATE TABLE tab (`uuid` String, `vector` Array(Float32), `version` UInt32, INDEX idx vector TYPE usearch()) ENGINE = MergeTree() ORDER BY (uuid);
INSERT INTO tab (uuid, version) VALUES ('1', 3); -- { serverError INCORRECT_DATA }
DROP TABLE tab;
-- Tuples with default value work
CREATE TABLE tab (`uuid` String, `vector` Tuple(Float32, Float32), `version` UInt32, INDEX idx vector TYPE usearch()) ENGINE = MergeTree() ORDER BY (uuid);
INSERT INTO tab (uuid, version) VALUES ('1', 3); -- works fine
DROP TABLE tab;

View File

@ -3,13 +3,13 @@
SYSTEM DROP QUERY CACHE;
-- rand() is non-deterministic, with default settings no entry in the query cache should be created
SELECT COUNT(rand(1)) SETTINGS use_query_cache = true;
-- rand() is non-deterministic, the query is rejected by default
SELECT COUNT(rand(1)) SETTINGS use_query_cache = true; -- { serverError CANNOT_USE_QUERY_CACHE_WITH_NONDETERMINISTIC_FUNCTIONS }
SELECT COUNT(*) FROM system.query_cache;
SELECT '---';
-- But an entry can be forced using a setting
-- Force caching using a setting
SELECT COUNT(RAND(1)) SETTINGS use_query_cache = true, query_cache_store_results_of_queries_with_nondeterministic_functions = true;
SELECT COUNT(*) FROM system.query_cache;

View File

@ -0,0 +1,41 @@
*** BEFORE MUTATION BEFORE MERGE ***
1 1 1 all_1_1_0
2 2 1 all_1_1_0
3 3 1 all_1_1_0
4 4 2 all_2_2_0
5 5 2 all_2_2_0
6 6 2 all_2_2_0
*** AFTER MUTATION BEFORE MERGE ***
1 0 1 all_1_1_0_3
2 0 1 all_1_1_0_3
3 0 1 all_1_1_0_3
4 4 2 all_2_2_0_3
5 5 2 all_2_2_0_3
6 6 2 all_2_2_0_3
*** AFTER MUTATION AFTER MERGE ***
1 0 1 all_1_2_1_3
2 0 1 all_1_2_1_3
3 0 1 all_1_2_1_3
4 4 2 all_1_2_1_3
5 5 2 all_1_2_1_3
6 6 2 all_1_2_1_3
*** AFTER MUTATION AFTER MERGE , NEW BLOCK ***
1 0 1 all_1_2_1_3
2 0 1 all_1_2_1_3
3 0 1 all_1_2_1_3
4 4 2 all_1_2_1_3
5 5 2 all_1_2_1_3
6 6 2 all_1_2_1_3
7 7 4 all_4_4_0
8 8 4 all_4_4_0
9 9 4 all_4_4_0
*** AFTER MUTATION AFTER MERGE , NEW BLOCK MERGED ***
1 0 1 all_1_4_2_3
2 0 1 all_1_4_2_3
3 0 1 all_1_4_2_3
4 4 2 all_1_4_2_3
5 5 2 all_1_4_2_3
6 6 2 all_1_4_2_3
7 7 4 all_1_4_2_3
8 8 4 all_1_4_2_3
9 9 4 all_1_4_2_3

View File

@ -0,0 +1,32 @@
DROP TABLE IF EXISTS test;
CREATE TABLE test (id UInt32, a UInt32) ENGINE = MergeTree ORDER BY id SETTINGS allow_experimental_block_number_column = true;
INSERT INTO test(id,a) VALUES (1,1),(2,2),(3,3);
INSERT INTO test(id,a) VALUES (4,4),(5,5),(6,6);
SELECT '*** BEFORE MUTATION BEFORE MERGE ***';
SELECT id,a,_block_number,_part from test ORDER BY id;
set mutations_sync=1;
ALTER TABLE test UPDATE a=0 WHERE id<4;
SELECT '*** AFTER MUTATION BEFORE MERGE ***';
SELECT id,a,_block_number,_part from test ORDER BY id;
OPTIMIZE TABLE test FINAL;
SELECT '*** AFTER MUTATION AFTER MERGE ***';
SELECT *,_block_number,_part from test ORDER BY id;
INSERT INTO test(id,a) VALUES (7,7),(8,8),(9,9);
SELECT '*** AFTER MUTATION AFTER MERGE , NEW BLOCK ***';
SELECT *,_block_number,_part from test ORDER BY id;
OPTIMIZE TABLE test FINAL;
SELECT '*** AFTER MUTATION AFTER MERGE , NEW BLOCK MERGED ***';
SELECT *,_block_number,_part from test ORDER BY id;
DROP TABLE test;

View File

@ -0,0 +1,41 @@
*** BEFORE MUTATION BEFORE MERGE ***
1 1 1 all_1_1_0
2 2 1 all_1_1_0
3 3 1 all_1_1_0
4 4 2 all_2_2_0
5 5 2 all_2_2_0
6 6 2 all_2_2_0
*** AFTER MUTATION BEFORE MERGE ***
1 0 1 all_1_1_0_3
2 0 1 all_1_1_0_3
3 0 1 all_1_1_0_3
4 4 2 all_2_2_0_3
5 5 2 all_2_2_0_3
6 6 2 all_2_2_0_3
*** AFTER MUTATION AFTER MERGE ***
1 0 1 all_1_2_1_3
2 0 1 all_1_2_1_3
3 0 1 all_1_2_1_3
4 4 2 all_1_2_1_3
5 5 2 all_1_2_1_3
6 6 2 all_1_2_1_3
*** AFTER MUTATION AFTER MERGE , NEW BLOCK ***
1 0 1 all_1_2_1_3
2 0 1 all_1_2_1_3
3 0 1 all_1_2_1_3
4 4 2 all_1_2_1_3
5 5 2 all_1_2_1_3
6 6 2 all_1_2_1_3
7 7 4 all_4_4_0
8 8 4 all_4_4_0
9 9 4 all_4_4_0
*** AFTER MUTATION AFTER MERGE , NEW BLOCK MERGED ***
1 0 1 all_1_4_2_3
2 0 1 all_1_4_2_3
3 0 1 all_1_4_2_3
4 4 2 all_1_4_2_3
5 5 2 all_1_4_2_3
6 6 2 all_1_4_2_3
7 7 4 all_1_4_2_3
8 8 4 all_1_4_2_3
9 9 4 all_1_4_2_3

View File

@ -0,0 +1,36 @@
DROP TABLE IF EXISTS test;
CREATE TABLE test (id UInt32, a UInt32) ENGINE = MergeTree ORDER BY id SETTINGS allow_experimental_block_number_column = true,
vertical_merge_algorithm_min_rows_to_activate = 1,
vertical_merge_algorithm_min_columns_to_activate = 0,
min_rows_for_wide_part = 1,
min_bytes_for_wide_part = 1;
INSERT INTO test(id,a) VALUES (1,1),(2,2),(3,3);
INSERT INTO test(id,a) VALUES (4,4),(5,5),(6,6);
SELECT '*** BEFORE MUTATION BEFORE MERGE ***';
SELECT id,a,_block_number,_part from test ORDER BY id;
set mutations_sync=1;
ALTER TABLE test UPDATE a=0 WHERE id<4;
SELECT '*** AFTER MUTATION BEFORE MERGE ***';
SELECT id,a,_block_number,_part from test ORDER BY id;
OPTIMIZE TABLE test FINAL;
SELECT '*** AFTER MUTATION AFTER MERGE ***';
SELECT *,_block_number,_part from test ORDER BY id;
INSERT INTO test(id,a) VALUES (7,7),(8,8),(9,9);
SELECT '*** AFTER MUTATION AFTER MERGE , NEW BLOCK ***';
SELECT *,_block_number,_part from test ORDER BY id;
OPTIMIZE TABLE test FINAL;
SELECT '*** AFTER MUTATION AFTER MERGE , NEW BLOCK MERGED ***';
SELECT *,_block_number,_part from test ORDER BY id;
DROP TABLE test;

View File

@ -0,0 +1,19 @@
0
1
2
3
*** AFTER FIRST OPTIMIZE ***
0 1
1 2
1 2
2 3
3 3
*** AFTER SECOND OPTIMIZE ***
0 1
1 2
1 2
2 3
3 3
4 4
5 4
6 4

View File

@ -0,0 +1,18 @@
DROP TABLE IF EXISTS t;
CREATE TABLE t (x UInt8, PROJECTION p (SELECT x GROUP BY x)) ENGINE = MergeTree ORDER BY () SETTINGS allow_experimental_block_number_column=true;
INSERT INTO t VALUES (0);
INSERT INTO t VALUES (1),(1);
INSERT INTO t VALUES (2),(3);
SELECT x FROM t GROUP BY x;
OPTIMIZE TABLE t FINAL;
SELECT '*** AFTER FIRST OPTIMIZE ***';
SELECT x,_block_number FROM t;
INSERT INTO t VALUES (4), (5), (6);
OPTIMIZE TABLE t FINAL;
SELECT '*** AFTER SECOND OPTIMIZE ***';
SELECT x,_block_number FROM t;
DROP TABLE t;

View File

@ -0,0 +1,88 @@
CreatingSets (Create sets before main query execution)
Expression ((Projection + Before ORDER BY))
ReadFromMergeTree (default.test_table)
Indexes:
PrimaryKey
Keys:
id
value
Condition: and((id in (-Inf, 10]), (value in 1-element set))
Parts: 1/1
Granules: 1/1
CreatingSets (Create sets before main query execution)
Expression ((Projection + Before ORDER BY))
ReadFromMergeTree (default.test_table)
Indexes:
PrimaryKey
Keys:
id
value
Condition: and((id in (-Inf, 10]), (value in 1-element set))
Parts: 1/1
Granules: 1/1
CreatingSets (Create sets before main query execution)
Expression ((Projection + Before ORDER BY))
ReadFromMergeTree (default.test_table)
Indexes:
PrimaryKey
Keys:
id
value
Condition: and((id in (-Inf, 10]), (value in 5-element set))
Parts: 1/1
Granules: 1/1
CreatingSets (Create sets before main query execution)
Expression ((Projection + Before ORDER BY))
ReadFromMergeTree (default.test_table)
Indexes:
PrimaryKey
Keys:
id
value
Condition: and((id in (-Inf, 10]), (value in 5-element set))
Parts: 1/1
Granules: 1/1
CreatingSets (Create sets before main query execution)
Expression ((Project names + Projection))
ReadFromMergeTree (default.test_table)
Indexes:
PrimaryKey
Keys:
id
value
Condition: and((id in (-Inf, 10]), (value in 1-element set))
Parts: 1/1
Granules: 1/1
CreatingSets (Create sets before main query execution)
Expression ((Project names + Projection))
ReadFromMergeTree (default.test_table)
Indexes:
PrimaryKey
Keys:
id
value
Condition: and((id in (-Inf, 10]), (value in 1-element set))
Parts: 1/1
Granules: 1/1
CreatingSets (Create sets before main query execution)
Expression ((Project names + Projection))
ReadFromMergeTree (default.test_table)
Indexes:
PrimaryKey
Keys:
id
value
Condition: and((id in (-Inf, 10]), (value in 5-element set))
Parts: 1/1
Granules: 1/1
CreatingSets (Create sets before main query execution)
Expression ((Project names + Projection))
ReadFromMergeTree (default.test_table)
Indexes:
PrimaryKey
Keys:
id
value
Condition: and((id in (-Inf, 10]), (value in 5-element set))
Parts: 1/1
Granules: 1/1

View File

@ -0,0 +1,24 @@
DROP TABLE IF EXISTS test_table;
CREATE TABLE test_table
(
id UInt64,
value UInt64
) ENGINE=MergeTree ORDER BY (id, value);
INSERT INTO test_table SELECT number, number FROM numbers(10);
SET allow_experimental_analyzer = 0;
EXPLAIN indexes = 1 SELECT id FROM test_table WHERE id <= 10 AND value IN (SELECT 5);
EXPLAIN indexes = 1 SELECT id FROM test_table WHERE id <= 10 AND value IN (SELECT '5');
EXPLAIN indexes = 1 SELECT id FROM test_table WHERE id <= 10 AND value IN (SELECT toUInt8(number) FROM numbers(5));
EXPLAIN indexes = 1 SELECT id FROM test_table WHERE id <= 10 AND value IN (SELECT toString(number) FROM numbers(5));
SET allow_experimental_analyzer = 1;
EXPLAIN indexes = 1 SELECT id FROM test_table WHERE id <= 10 AND value IN (SELECT 5);
EXPLAIN indexes = 1 SELECT id FROM test_table WHERE id <= 10 AND value IN (SELECT '5');
EXPLAIN indexes = 1 SELECT id FROM test_table WHERE id <= 10 AND value IN (SELECT toUInt8(number) FROM numbers(5));
EXPLAIN indexes = 1 SELECT id FROM test_table WHERE id <= 10 AND value IN (SELECT toString(number) FROM numbers(5));
DROP TABLE test_table;

View File

@ -1,10 +0,0 @@
DROP TABLE IF EXISTS test_dup_index;
CREATE TABLE test_dup_index
(
a Int64,
b Int64,
INDEX idx_a a TYPE minmax,
INDEX idx_a b TYPE minmax
) Engine = MergeTree()
ORDER BY a; -- { serverError ILLEGAL_INDEX }