Merge branch 'master' into query-plan-serialization-steps

This commit is contained in:
Nikolai Kochetov 2024-12-03 16:16:01 +01:00
commit c00a85a601
125 changed files with 1140 additions and 449 deletions

View File

@ -66,6 +66,7 @@ if (ENABLE_CHECK_HEAVY_BUILDS)
# Twice as large
set (RLIMIT_DATA 10000000000)
set (RLIMIT_AS 20000000000)
set (RLIMIT_CPU 2000)
endif()
# For some files currently building RISCV64/LOONGARCH64 might be too slow.

View File

@ -82,5 +82,5 @@ ENV MINIO_ROOT_USER="clickhouse"
ENV MINIO_ROOT_PASSWORD="clickhouse"
ENV EXPORT_S3_STORAGE_POLICIES=1
RUN npm install -g azurite@3.30.0 \
RUN npm install -g azurite@3.33.0 \
&& npm install -g tslib && npm install -g node

View File

@ -733,6 +733,18 @@ SELECT JSONExtract('{"day": "Thursday"}', 'day', 'Enum8(\'Sunday\' = 0, \'Monday
SELECT JSONExtract('{"day": 5}', 'day', 'Enum8(\'Sunday\' = 0, \'Monday\' = 1, \'Tuesday\' = 2, \'Wednesday\' = 3, \'Thursday\' = 4, \'Friday\' = 5, \'Saturday\' = 6)') = 'Friday'
```
Referring to a nested values by passing multiple indices_or_keys parameters:
```
SELECT JSONExtract('{"a":{"b":"hello","c":{"d":[1,2,3],"e":[1,3,7]}}}','a','c','Map(String, Array(UInt8))') AS val, toTypeName(val), val['d'];
```
Result:
```
┌─val───────────────────────┬─toTypeName(val)───────────┬─arrayElement(val, 'd')─┐
│ {'d':[1,2,3],'e':[1,3,7]} │ Map(String, Array(UInt8)) │ [1,2,3] │
└───────────────────────────┴───────────────────────────┴────────────────────────┘
```
### JSONExtractKeysAndValues
Parses key-value pairs from JSON where the values are of the given ClickHouse data type.

View File

@ -173,18 +173,7 @@ Result:
└───┴────┴─────┘
```
## [experimental] Join with inequality conditions for columns from different tables
:::note
This feature is experimental. To use it, set `allow_experimental_join_condition` to 1 in your configuration files or by using the `SET` command:
```sql
SET allow_experimental_join_condition=1
```
Otherwise, you'll get `INVALID_JOIN_ON_EXPRESSION`.
:::
## Join with inequality conditions for columns from different tables
Clickhouse currently supports `ALL/ANY/SEMI/ANTI INNER/LEFT/RIGHT/FULL JOIN` with inequality conditions in addition to equality conditions. The inequality conditions are supported only for `hash` and `grace_hash` join algorithms. The inequality conditions are not supported with `join_use_nulls`.

View File

@ -50,6 +50,10 @@ SELECT name FROM mysql(`mysql1:3306|mysql2:3306|mysql3:3306`, 'mysql_database',
A table object with the same columns as the original MySQL table.
:::note
Some data types of MySQL can be mapped to different ClickHouse types - this is addressed by query-level setting [mysql_datatypes_support_level](/docs/en/operations/settings/settings.md#mysql_datatypes_support_level)
:::
:::note
In the `INSERT` query to distinguish table function `mysql(...)` from table name with column names list, you must use keywords `FUNCTION` or `TABLE FUNCTION`. See examples below.
:::
@ -141,3 +145,7 @@ WHERE id > (SELECT max(id) from mysql_copy);
- [The MySQL table engine](../../engines/table-engines/integrations/mysql.md)
- [Using MySQL as a dictionary source](../../sql-reference/dictionaries/index.md#dictionary-sources#dicts-external_dicts_dict_sources-mysql)
- [mysql_datatypes_support_level](/docs/en/operations/settings/settings.md#mysql_datatypes_support_level)
- [mysql_map_fixed_string_to_text_in_show_columns](/docs/en/operations/settings/settings.md#mysql_map_fixed_string_to_text_in_show_columns)
- [mysql_map_string_to_text_in_show_columns](/docs/en/operations/settings/settings.md#mysql_map_string_to_text_in_show_columns)
- [mysql_max_rows_to_insert](/docs/en/operations/settings/settings.md#mysql_max_rows_to_insert)

View File

@ -66,10 +66,7 @@ ASTPtr ArrayJoinNode::toASTImpl(const ConvertToASTOptions & options) const
auto * column_node = array_join_expression->as<ColumnNode>();
if (column_node && column_node->getExpression())
{
if (const auto * function_node = column_node->getExpression()->as<FunctionNode>(); function_node && function_node->getFunctionName() == "nested")
array_join_expression_ast = array_join_expression->toAST(options);
else
array_join_expression_ast = column_node->getExpression()->toAST(options);
array_join_expression_ast = column_node->getExpression()->toAST(options);
}
else
array_join_expression_ast = array_join_expression->toAST(options);

View File

@ -165,6 +165,9 @@ struct IdentifierResolveScope
/// Table expression node to data
std::unordered_map<QueryTreeNodePtr, AnalysisTableExpressionData> table_expression_node_to_data;
/// Table expression nodes that appear in the join tree of the corresponding query
std::unordered_set<QueryTreeNodePtr> registered_table_expression_nodes;
QueryTreeNodePtrWithHashIgnoreTypesSet nullable_group_by_keys;
/// Here we count the number of nullable GROUP BY keys we met resolving expression.
/// E.g. for a query `SELECT tuple(tuple(number)) FROM numbers(10) GROUP BY (number, tuple(number)) with cube`

View File

@ -565,7 +565,7 @@ QueryTreeNodePtr IdentifierResolver::tryResolveIdentifierFromExpressionArguments
bool IdentifierResolver::tryBindIdentifierToAliases(const IdentifierLookup & identifier_lookup, const IdentifierResolveScope & scope)
{
return scope.aliases.find(identifier_lookup, ScopeAliases::FindOption::FIRST_NAME) != nullptr || scope.aliases.array_join_aliases.contains(identifier_lookup.identifier.front());
return scope.aliases.find(identifier_lookup, ScopeAliases::FindOption::FIRST_NAME) != nullptr;
}
/** Resolve identifier from table columns.
@ -680,6 +680,27 @@ bool IdentifierResolver::tryBindIdentifierToTableExpressions(const IdentifierLoo
return can_bind_identifier_to_table_expression;
}
bool IdentifierResolver::tryBindIdentifierToArrayJoinExpressions(const IdentifierLookup & identifier_lookup, const IdentifierResolveScope & scope)
{
bool result = false;
for (const auto & table_expression : scope.registered_table_expression_nodes)
{
auto * array_join_node = table_expression->as<ArrayJoinNode>();
if (!array_join_node)
continue;
for (const auto & array_join_expression : array_join_node->getJoinExpressions())
{
auto array_join_expression_alias = array_join_expression->getAlias();
if (identifier_lookup.identifier.front() == array_join_expression_alias)
return true;
}
}
return result;
}
QueryTreeNodePtr IdentifierResolver::tryResolveIdentifierFromStorage(
const Identifier & identifier,
const QueryTreeNodePtr & table_expression_node,
@ -1415,9 +1436,6 @@ QueryTreeNodePtr IdentifierResolver::tryResolveIdentifierFromArrayJoin(const Ide
IdentifierView identifier_view(identifier_lookup.identifier);
if (identifier_view.isCompound() && from_array_join_node.hasAlias() && identifier_view.front() == from_array_join_node.getAlias())
identifier_view.popFirst();
const auto & alias_or_name = array_join_column_expression_typed.hasAlias()
? array_join_column_expression_typed.getAlias()
: array_join_column_expression_typed.getColumnName();
@ -1429,18 +1447,16 @@ QueryTreeNodePtr IdentifierResolver::tryResolveIdentifierFromArrayJoin(const Ide
else
continue;
auto array_join_column = std::make_shared<ColumnNode>(array_join_column_expression_typed.getColumn(),
array_join_column_expression_typed.getColumnSource());
if (identifier_view.empty())
{
auto array_join_column = std::make_shared<ColumnNode>(array_join_column_expression_typed.getColumn(),
array_join_column_expression_typed.getColumnSource());
return array_join_column;
}
/// Resolve subcolumns. Example : SELECT x.y.z FROM tab ARRAY JOIN arr AS x
auto compound_expr = tryResolveIdentifierFromCompoundExpression(
identifier_lookup.identifier,
identifier_lookup.identifier.getPartsSize() - identifier_view.getPartsSize() /*identifier_bind_size*/,
array_join_column_expression,
array_join_column,
{} /* compound_expression_source */,
scope,
true /* can_be_not_found */);

View File

@ -110,6 +110,9 @@ public:
const QueryTreeNodePtr & table_expression_node,
const IdentifierResolveScope & scope);
static bool tryBindIdentifierToArrayJoinExpressions(const IdentifierLookup & identifier_lookup,
const IdentifierResolveScope & scope);
QueryTreeNodePtr tryResolveIdentifierFromTableExpression(const IdentifierLookup & identifier_lookup,
const QueryTreeNodePtr & table_expression_node,
IdentifierResolveScope & scope);

View File

@ -1,6 +1,6 @@
#include <Analyzer/Passes/QueryAnalysisPass.h>
#include <Analyzer/Resolve/QueryAnalyzer.h>
#include <Analyzer/createUniqueTableAliases.h>
#include <Analyzer/createUniqueAliasesIfNecessary.h>
namespace DB
{
@ -16,7 +16,7 @@ void QueryAnalysisPass::run(QueryTreeNodePtr & query_tree_node, ContextPtr conte
{
QueryAnalyzer analyzer(only_analyze);
analyzer.resolve(query_tree_node, table_expression, context);
createUniqueTableAliases(query_tree_node, table_expression, context);
createUniqueAliasesIfNecessary(query_tree_node, context);
}
}

View File

@ -1593,7 +1593,7 @@ void QueryAnalyzer::qualifyColumnNodesWithProjectionNames(const QueryTreeNodes &
if (need_to_qualify)
need_to_qualify = IdentifierResolver::tryBindIdentifierToTableExpressions(identifier_lookup, table_expression_node, scope);
if (IdentifierResolver::tryBindIdentifierToAliases(identifier_lookup, scope))
if (IdentifierResolver::tryBindIdentifierToAliases(identifier_lookup, scope) || IdentifierResolver::tryBindIdentifierToArrayJoinExpressions(identifier_lookup, scope))
need_to_qualify = true;
if (need_to_qualify)
@ -4974,6 +4974,16 @@ void QueryAnalyzer::resolveArrayJoin(QueryTreeNodePtr & array_join_node, Identif
throw Exception(ErrorCodes::NUMBER_OF_ARGUMENTS_DOESNT_MATCH,
"ARRAY JOIN requires at least single expression");
/// Register expression aliases in the scope
for (const auto & elem : array_join_nodes)
{
for (auto & child : elem->getChildren())
{
if (child)
expressions_visitor.visit(child);
}
}
std::vector<QueryTreeNodePtr> array_join_column_expressions;
array_join_column_expressions.reserve(array_join_nodes_size);
@ -4981,18 +4991,6 @@ void QueryAnalyzer::resolveArrayJoin(QueryTreeNodePtr & array_join_node, Identif
{
auto array_join_expression_alias = array_join_expression->getAlias();
for (const auto & elem : array_join_nodes)
{
if (elem->hasAlias())
scope.aliases.array_join_aliases.insert(elem->getAlias());
for (auto & child : elem->getChildren())
{
if (child)
expressions_visitor.visit(child);
}
}
std::string identifier_full_name;
if (auto * identifier_node = array_join_expression->as<IdentifierNode>())
@ -5365,6 +5363,7 @@ void QueryAnalyzer::resolveQueryJoinTreeNode(QueryTreeNodePtr & join_tree_node,
};
add_table_expression_alias_into_scope(join_tree_node);
scope.registered_table_expression_nodes.insert(join_tree_node);
scope.table_expressions_in_resolve_process.erase(join_tree_node.get());
}

View File

@ -27,10 +27,6 @@ struct ScopeAliases
std::unordered_set<QueryTreeNodePtr> nodes_with_duplicated_aliases;
std::vector<QueryTreeNodePtr> cloned_nodes_with_duplicated_aliases;
/// Names which are aliases from ARRAY JOIN.
/// This is needed to properly qualify columns from matchers and avoid name collision.
std::unordered_set<std::string> array_join_aliases;
std::unordered_map<std::string, QueryTreeNodePtr> & getAliasMap(IdentifierLookupContext lookup_context)
{
switch (lookup_context)

View File

@ -1,6 +1,7 @@
#include <memory>
#include <unordered_map>
#include <Analyzer/createUniqueTableAliases.h>
#include <Analyzer/createUniqueAliasesIfNecessary.h>
#include <Analyzer/ArrayJoinNode.h>
#include <Analyzer/ColumnNode.h>
#include <Analyzer/FunctionNode.h>
#include <Analyzer/InDepthQueryTreeVisitor.h>
#include <Analyzer/IQueryTreeNode.h>
@ -48,8 +49,6 @@ public:
case QueryTreeNodeType::TABLE:
[[fallthrough]];
case QueryTreeNodeType::TABLE_FUNCTION:
[[fallthrough]];
case QueryTreeNodeType::ARRAY_JOIN:
{
auto & alias = table_expression_to_alias[node];
if (alias.empty())
@ -60,6 +59,12 @@ public:
}
break;
}
case QueryTreeNodeType::ARRAY_JOIN:
{
/// Simulate previous behaviour and preserve table naming with previous versions
++next_id;
break;
}
default:
break;
}
@ -130,12 +135,97 @@ private:
std::unordered_map<QueryTreeNodePtr, String> table_expression_to_alias;
};
}
void createUniqueTableAliases(QueryTreeNodePtr & node, const QueryTreeNodePtr & /*table_expression*/, const ContextPtr & context)
class CreateUniqueArrayJoinAliasesVisitor : public InDepthQueryTreeVisitorWithContext<CreateUniqueArrayJoinAliasesVisitor>
{
public:
using Base = InDepthQueryTreeVisitorWithContext<CreateUniqueArrayJoinAliasesVisitor>;
using Base::Base;
void enterImpl(QueryTreeNodePtr & node)
{
if (auto * array_join_typed = node->as<ArrayJoinNode>())
{
populateRenamingMap(array_join_typed, renaming[array_join_typed]);
return;
}
auto * column_node = node->as<ColumnNode>();
if (!column_node || replaced_nodes_set.contains(node))
return;
auto column_source = column_node->getColumnSource();
auto * array_join = column_source->as<ArrayJoinNode>();
if (!array_join)
return;
auto & renaming_map = getRenamingMap(array_join);
auto new_column = column_node->getColumn();
new_column.name = renaming_map[column_node->getColumnName()];
auto new_column_node = std::make_shared<ColumnNode>(new_column, column_source);
node = std::move(new_column_node);
replaced_nodes_set.insert(node);
}
private:
using RenamingMap = std::unordered_map<String, String>;
void populateRenamingMap(ArrayJoinNode * array_join, RenamingMap & result)
{
if (result.empty())
{
for (auto & array_join_expression : array_join->getJoinExpressions())
{
auto * array_join_column = array_join_expression->as<ColumnNode>();
chassert(array_join_column != nullptr);
String unique_expression_name = fmt::format("__array_join_exp_{}", ++next_id);
result.emplace(array_join_column->getColumnName(), unique_expression_name);
auto replacement_column = array_join_column->getColumn();
replacement_column.name = unique_expression_name;
auto replacement_column_node = std::make_shared<ColumnNode>(replacement_column, array_join_column->getExpression(), array_join_column->getColumnSource());
replacement_column_node->setAlias(unique_expression_name);
array_join_expression = std::move(replacement_column_node);
replaced_nodes_set.insert(array_join_expression);
}
}
}
RenamingMap & getRenamingMap(ArrayJoinNode * array_join)
{
auto & result = renaming[array_join];
populateRenamingMap(array_join, result);
return result;
}
size_t next_id = 0;
std::unordered_map<ArrayJoinNode *, RenamingMap> renaming;
// TODO: Remove this field when identifier resolution cache removed from analyzer.
std::unordered_set<QueryTreeNodePtr> replaced_nodes_set;
};
}
void createUniqueAliasesIfNecessary(QueryTreeNodePtr & node, const ContextPtr & context)
{
/*
* For each table expression in the Query Tree generate and add a unique alias.
* If table expression had an alias in initial query tree, override it.
*/
CreateUniqueTableAliasesVisitor(context).visit(node);
/* Generate unique aliases for array join expressions.
* It's required to create a valid AST for distributed query.
*/
CreateUniqueArrayJoinAliasesVisitor(context).visit(node);
}
}

View File

@ -0,0 +1,14 @@
#pragma once
#include <memory>
#include <Interpreters/Context_fwd.h>
class IQueryTreeNode;
using QueryTreeNodePtr = std::shared_ptr<IQueryTreeNode>;
namespace DB
{
void createUniqueAliasesIfNecessary(QueryTreeNodePtr & node, const ContextPtr & context);
}

View File

@ -1,18 +0,0 @@
#pragma once
#include <memory>
#include <Interpreters/Context_fwd.h>
class IQueryTreeNode;
using QueryTreeNodePtr = std::shared_ptr<IQueryTreeNode>;
namespace DB
{
/*
* For each table expression in the Query Tree generate and add a unique alias.
* If table expression had an alias in initial query tree, override it.
*/
void createUniqueTableAliases(QueryTreeNodePtr & node, const QueryTreeNodePtr & table_expression, const ContextPtr & context);
}

View File

@ -24,7 +24,6 @@ namespace DB
{
namespace ErrorCodes
{
extern const int AZURE_BLOB_STORAGE_ERROR;
extern const int LOGICAL_ERROR;
}
@ -234,11 +233,8 @@ bool BackupWriterAzureBlobStorage::fileExists(const String & file_name)
UInt64 BackupWriterAzureBlobStorage::getFileSize(const String & file_name)
{
String key = fs::path(blob_path) / file_name;
RelativePathsWithMetadata children;
object_storage->listObjects(key,children,/*max_keys*/0);
if (children.empty())
throw Exception(ErrorCodes::AZURE_BLOB_STORAGE_ERROR, "Object must exist");
return children[0]->metadata->size_bytes;
ObjectMetadata object_metadata = object_storage->getObjectMetadata(key);
return object_metadata.size_bytes;
}
std::unique_ptr<ReadBuffer> BackupWriterAzureBlobStorage::readFile(const String & file_name, size_t /*expected_file_size*/)

View File

@ -38,7 +38,7 @@ public:
private:
const DataSourceDescription data_source_description;
std::shared_ptr<const Azure::Storage::Blobs::BlobContainerClient> client;
std::shared_ptr<const AzureBlobStorage::ContainerClient> client;
AzureBlobStorage::ConnectionParams connection_params;
String blob_path;
std::unique_ptr<AzureObjectStorage> object_storage;
@ -88,7 +88,7 @@ private:
void removeFilesBatch(const Strings & file_names);
const DataSourceDescription data_source_description;
std::shared_ptr<const Azure::Storage::Blobs::BlobContainerClient> client;
std::shared_ptr<const AzureBlobStorage::ContainerClient> client;
AzureBlobStorage::ConnectionParams connection_params;
String blob_path;
std::unique_ptr<AzureObjectStorage> object_storage;

View File

@ -151,7 +151,8 @@ void ConnectionEstablisherAsync::Task::run(AsyncCallback async_callback, Suspend
{
connection_establisher_async.reset();
connection_establisher_async.connection_establisher.setAsyncCallback(async_callback);
connection_establisher_async.connection_establisher.run(connection_establisher_async.result, connection_establisher_async.fail_message);
connection_establisher_async.connection_establisher.run(connection_establisher_async.result,
connection_establisher_async.fail_message, connection_establisher_async.force_connected);
connection_establisher_async.is_finished = true;
}

View File

@ -76,6 +76,8 @@ public:
const std::string & getFailMessage() const { return fail_message; }
void resumeConnectionWithForceOption(bool force_connected_) {force_connected = force_connected_; resume();}
private:
bool checkBeforeTaskResume() override;
@ -125,6 +127,7 @@ private:
bool is_finished = false;
bool restarted = false;
bool force_connected = false;
};
#endif

View File

@ -281,7 +281,7 @@ int HedgedConnectionsFactory::getReadyFileDescriptor(bool blocking, AsyncCallbac
HedgedConnectionsFactory::State HedgedConnectionsFactory::resumeConnectionEstablisher(int index, Connection *& connection_out)
{
replicas[index].connection_establisher->resume();
replicas[index].connection_establisher->resumeConnectionWithForceOption(/*force_connected_*/ shuffled_pools[index].error_count != 0);
if (replicas[index].connection_establisher->isCancelled())
return State::CANNOT_CHOOSE;

View File

@ -256,10 +256,6 @@ AllocationTrace MemoryTracker::allocImpl(Int64 size, bool throw_if_memory_exceed
Int64 will_be = size ? size + amount.fetch_add(size, std::memory_order_relaxed) : amount.load(std::memory_order_relaxed);
Int64 will_be_rss = size ? size + rss.fetch_add(size, std::memory_order_relaxed) : rss.load(std::memory_order_relaxed);
auto metric_loaded = metric.load(std::memory_order_relaxed);
if (metric_loaded != CurrentMetrics::end() && size)
CurrentMetrics::add(metric_loaded, size);
Int64 current_hard_limit = hard_limit.load(std::memory_order_relaxed);
Int64 current_profiler_limit = profiler_limit.load(std::memory_order_relaxed);
@ -371,6 +367,10 @@ AllocationTrace MemoryTracker::allocImpl(Int64 size, bool throw_if_memory_exceed
}
}
auto metric_loaded = metric.load(std::memory_order_relaxed);
if (metric_loaded != CurrentMetrics::end() && size)
CurrentMetrics::add(metric_loaded, size);
if (peak_updated && allocation_traced)
{
MemoryTrackerBlockerInThread untrack_lock(VariableContext::Global);

View File

@ -229,6 +229,7 @@
\
M(WaitMarksLoadMicroseconds, "Time spent loading marks", ValueType::Microseconds) \
M(BackgroundLoadingMarksTasks, "Number of background tasks for loading marks", ValueType::Number) \
M(LoadedMarksFiles, "Number of mark files loaded.", ValueType::Number) \
M(LoadedMarksCount, "Number of marks loaded (total across columns).", ValueType::Number) \
M(LoadedMarksMemoryBytes, "Size of in-memory representations of loaded marks.", ValueType::Bytes) \
M(LoadedPrimaryIndexFiles, "Number of primary index files loaded.", ValueType::Number) \
@ -814,6 +815,7 @@ The server successfully detected this situation and will download merged part fr
M(LogWarning, "Number of log messages with level Warning", ValueType::Number) \
M(LogError, "Number of log messages with level Error", ValueType::Number) \
M(LogFatal, "Number of log messages with level Fatal", ValueType::Number) \
M(LoggerElapsedNanoseconds, "Cumulative time spend in logging", ValueType::Nanoseconds) \
\
M(InterfaceHTTPSendBytes, "Number of bytes sent through HTTP interfaces", ValueType::Bytes) \
M(InterfaceHTTPReceiveBytes, "Number of bytes received through HTTP interfaces", ValueType::Bytes) \
@ -1087,6 +1089,11 @@ void incrementForLogMessage(Poco::Message::Priority priority)
}
}
void incrementLoggerElapsedNanoseconds(UInt64 ns)
{
increment(LoggerElapsedNanoseconds, ns);
}
CountersIncrement::CountersIncrement(Counters::Snapshot const & snapshot)
{
init();

View File

@ -173,6 +173,9 @@ namespace ProfileEvents
/// Increment a counter for log messages.
void incrementForLogMessage(Poco::Message::Priority priority);
/// Increment time consumed by logging.
void incrementLoggerElapsedNanoseconds(UInt64 ns);
/// Get name of event by identifier. Returns statically allocated string.
const char * getName(Event event);

View File

@ -11,6 +11,7 @@
#include <Common/Logger.h>
#include <Common/LoggingFormatStringHelpers.h>
#include <Common/ProfileEvents.h>
#include <Common/Stopwatch.h>
#define LogToStr(x, y) std::make_unique<LogToStrImpl>(x, y)
@ -69,6 +70,7 @@ namespace impl
if (!_is_clients_log && !_logger->is((PRIORITY))) \
break; \
\
Stopwatch _logger_watch; \
try \
{ \
ProfileEvents::incrementForLogMessage(PRIORITY); \
@ -122,6 +124,7 @@ namespace impl
{ \
::write(STDERR_FILENO, static_cast<const void *>(MESSAGE_FOR_EXCEPTION_ON_LOGGING), sizeof(MESSAGE_FOR_EXCEPTION_ON_LOGGING)); \
} \
ProfileEvents::incrementLoggerElapsedNanoseconds(_logger_watch.elapsedNanoseconds()); \
} while (false)

View File

@ -35,61 +35,71 @@ std::string makeRegexpPatternFromGlobs(const std::string & initial_str_with_glob
}
std::string escaped_with_globs = buf_for_escaping.str();
static const re2::RE2 enum_or_range(R"({([\d]+\.\.[\d]+|[^{}*,]+,[^{}*]*[^{}*,])})"); /// regexp for {expr1,expr2,expr3} or {M..N}, where M and N - non-negative integers, expr's should be without "{", "}", "*" and ","
std::string_view input(escaped_with_globs);
static const re2::RE2 range_regex(R"({([\d]+\.\.[\d]+)})"); /// regexp for {M..N}, where M and N - non-negative integers
static const re2::RE2 enum_regex(R"({([^{}*,]+[^{}*]*[^{}*,])})"); /// regexp for {expr1,expr2,expr3}, expr's should be without "{", "}", "*" and ","
std::string_view matched;
std::string_view input(escaped_with_globs);
std::ostringstream oss_for_replacing; // STYLE_CHECK_ALLOW_STD_STRING_STREAM
oss_for_replacing.exceptions(std::ios::failbit);
size_t current_index = 0;
while (RE2::FindAndConsume(&input, enum_or_range, &matched))
while (RE2::FindAndConsume(&input, range_regex, &matched))
{
std::string buffer(matched);
oss_for_replacing << escaped_with_globs.substr(current_index, matched.data() - escaped_with_globs.data() - current_index - 1) << '(';
if (!buffer.contains(','))
size_t range_begin = 0;
size_t range_end = 0;
char point;
ReadBufferFromString buf_range(buffer);
buf_range >> range_begin >> point >> point >> range_end;
size_t range_begin_width = buffer.find('.');
size_t range_end_width = buffer.size() - buffer.find_last_of('.') - 1;
bool leading_zeros = buffer[0] == '0';
size_t output_width = 0;
if (range_begin > range_end) //Descending Sequence {20..15} {9..01}
{
size_t range_begin = 0;
size_t range_end = 0;
char point;
ReadBufferFromString buf_range(buffer);
buf_range >> range_begin >> point >> point >> range_end;
std::swap(range_begin,range_end);
leading_zeros = buffer[buffer.find_last_of('.')+1]=='0';
std::swap(range_begin_width,range_end_width);
}
if (range_begin_width == 1 && leading_zeros)
output_width = 1; ///Special Case: {0..10} {0..999}
else
output_width = std::max(range_begin_width, range_end_width);
size_t range_begin_width = buffer.find('.');
size_t range_end_width = buffer.size() - buffer.find_last_of('.') - 1;
bool leading_zeros = buffer[0] == '0';
size_t output_width = 0;
if (range_begin > range_end) //Descending Sequence {20..15} {9..01}
{
std::swap(range_begin,range_end);
leading_zeros = buffer[buffer.find_last_of('.')+1]=='0';
std::swap(range_begin_width,range_end_width);
}
if (range_begin_width == 1 && leading_zeros)
output_width = 1; ///Special Case: {0..10} {0..999}
else
output_width = std::max(range_begin_width, range_end_width);
if (leading_zeros)
oss_for_replacing << std::setfill('0') << std::setw(static_cast<int>(output_width));
oss_for_replacing << range_begin;
for (size_t i = range_begin + 1; i <= range_end; ++i)
{
oss_for_replacing << '|';
if (leading_zeros)
oss_for_replacing << std::setfill('0') << std::setw(static_cast<int>(output_width));
oss_for_replacing << range_begin;
oss_for_replacing << i;
}
for (size_t i = range_begin + 1; i <= range_end; ++i)
{
oss_for_replacing << '|';
if (leading_zeros)
oss_for_replacing << std::setfill('0') << std::setw(static_cast<int>(output_width));
oss_for_replacing << i;
}
}
else
{
std::replace(buffer.begin(), buffer.end(), ',', '|');
oss_for_replacing << buffer;
}
oss_for_replacing << ")";
current_index = input.data() - escaped_with_globs.data();
}
while (RE2::FindAndConsume(&input, enum_regex, &matched))
{
std::string buffer(matched);
oss_for_replacing << escaped_with_globs.substr(current_index, matched.data() - escaped_with_globs.data() - current_index - 1) << '(';
std::replace(buffer.begin(), buffer.end(), ',', '|');
oss_for_replacing << buffer;
oss_for_replacing << ")";
current_index = input.data() - escaped_with_globs.data();
}
oss_for_replacing << escaped_with_globs.substr(current_index);
std::string almost_res = oss_for_replacing.str();
WriteBufferFromOwnString buf_final_processing;

View File

@ -12,6 +12,9 @@ TEST(Common, makeRegexpPatternFromGlobs)
EXPECT_EQ(makeRegexpPatternFromGlobs("*"), "[^/]*");
EXPECT_EQ(makeRegexpPatternFromGlobs("/?"), "/[^/]");
EXPECT_EQ(makeRegexpPatternFromGlobs("/*"), "/[^/]*");
EXPECT_EQ(makeRegexpPatternFromGlobs("{123}"), "(123)");
EXPECT_EQ(makeRegexpPatternFromGlobs("{test}"), "(test)");
EXPECT_EQ(makeRegexpPatternFromGlobs("{test.tar.gz}"), "(test\\.tar\\.gz)");
EXPECT_EQ(makeRegexpPatternFromGlobs("*_{{a,b,c,d}}/?.csv"), "[^/]*_\\{(a|b|c|d)\\}/[^/]\\.csv");
/* Regex Parsing for {..} can have three possible cases
1) The left range width == the right range width

View File

@ -479,12 +479,20 @@ public:
return true;
}
template <typename T> auto & safeGet() const
template <typename T> const auto & safeGet() const &
{
return const_cast<Field *>(this)->safeGet<T>();
}
template <typename T> auto safeGet() const &&
{
return std::move(const_cast<Field *>(this)->safeGet<T>());
}
template <typename T> auto & safeGet();
template <typename T> auto & safeGet() &;
template <typename T> auto safeGet() &&
{
return std::move(safeGet<T>());
}
bool operator< (const Field & rhs) const
{
@ -880,7 +888,7 @@ constexpr bool isInt64OrUInt64orBoolFieldType(Field::Types::Which t)
}
template <typename T>
auto & Field::safeGet()
auto & Field::safeGet() &
{
const Types::Which target = TypeToEnum<NearestFieldType<std::decay_t<T>>>::value;

View File

@ -37,7 +37,7 @@ namespace ErrorCodes
}
ReadBufferFromAzureBlobStorage::ReadBufferFromAzureBlobStorage(
std::shared_ptr<const Azure::Storage::Blobs::BlobContainerClient> blob_container_client_,
ContainerClientPtr blob_container_client_,
const String & path_,
const ReadSettings & read_settings_,
size_t max_single_read_retries_,
@ -228,7 +228,7 @@ void ReadBufferFromAzureBlobStorage::initialize()
try
{
ProfileEvents::increment(ProfileEvents::AzureGetObject);
if (blob_container_client->GetClickhouseOptions().IsClientForDisk)
if (blob_container_client->IsClientForDisk())
ProfileEvents::increment(ProfileEvents::DiskAzureGetObject);
auto download_response = blob_client->Download(download_options);
@ -281,7 +281,7 @@ size_t ReadBufferFromAzureBlobStorage::readBigAt(char * to, size_t n, size_t ran
try
{
ProfileEvents::increment(ProfileEvents::AzureGetObject);
if (blob_container_client->GetClickhouseOptions().IsClientForDisk)
if (blob_container_client->IsClientForDisk())
ProfileEvents::increment(ProfileEvents::DiskAzureGetObject);
Azure::Storage::Blobs::DownloadBlobOptions download_options;

View File

@ -8,7 +8,7 @@
#include <IO/ReadBufferFromFileBase.h>
#include <IO/ReadSettings.h>
#include <IO/WithFileName.h>
#include <azure/storage/blobs.hpp>
#include <Disks/ObjectStorages/AzureBlobStorage/AzureBlobStorageCommon.h>
namespace DB
{
@ -16,9 +16,11 @@ namespace DB
class ReadBufferFromAzureBlobStorage : public ReadBufferFromFileBase
{
public:
using ContainerClientPtr = std::shared_ptr<const AzureBlobStorage::ContainerClient>;
using BlobClientPtr = std::unique_ptr<const AzureBlobStorage::BlobClient>;
ReadBufferFromAzureBlobStorage(
std::shared_ptr<const Azure::Storage::Blobs::BlobContainerClient> blob_container_client_,
ContainerClientPtr blob_container_client_,
const String & path_,
const ReadSettings & read_settings_,
size_t max_single_read_retries_,
@ -53,8 +55,8 @@ private:
void initialize();
std::unique_ptr<Azure::Core::IO::BodyStream> data_stream;
std::shared_ptr<const Azure::Storage::Blobs::BlobContainerClient> blob_container_client;
std::unique_ptr<Azure::Storage::Blobs::BlobClient> blob_client;
ContainerClientPtr blob_container_client;
BlobClientPtr blob_client;
const String path;
size_t max_single_read_retries;

View File

@ -54,7 +54,7 @@ BufferAllocationPolicyPtr createBufferAllocationPolicy(const AzureBlobStorage::R
}
WriteBufferFromAzureBlobStorage::WriteBufferFromAzureBlobStorage(
std::shared_ptr<const Azure::Storage::Blobs::BlobContainerClient> blob_container_client_,
AzureClientPtr blob_container_client_,
const String & blob_path_,
size_t buf_size_,
const WriteSettings & write_settings_,
@ -142,7 +142,7 @@ void WriteBufferFromAzureBlobStorage::preFinalize()
if (block_ids.empty() && detached_part_data.size() == 1 && detached_part_data.front().data_size <= max_single_part_upload_size)
{
ProfileEvents::increment(ProfileEvents::AzureUpload);
if (blob_container_client->GetClickhouseOptions().IsClientForDisk)
if (blob_container_client->IsClientForDisk())
ProfileEvents::increment(ProfileEvents::DiskAzureUpload);
auto part_data = std::move(detached_part_data.front());
@ -174,7 +174,7 @@ void WriteBufferFromAzureBlobStorage::finalizeImpl()
{
auto block_blob_client = blob_container_client->GetBlockBlobClient(blob_path);
ProfileEvents::increment(ProfileEvents::AzureCommitBlockList);
if (blob_container_client->GetClickhouseOptions().IsClientForDisk)
if (blob_container_client->IsClientForDisk())
ProfileEvents::increment(ProfileEvents::DiskAzureCommitBlockList);
execWithRetry([&](){ block_blob_client.CommitBlockList(block_ids); }, max_unexpected_write_error_retries);
@ -311,7 +311,7 @@ void WriteBufferFromAzureBlobStorage::writePart(WriteBufferFromAzureBlobStorage:
auto block_blob_client = blob_container_client->GetBlockBlobClient(blob_path);
ProfileEvents::increment(ProfileEvents::AzureStageBlock);
if (blob_container_client->GetClickhouseOptions().IsClientForDisk)
if (blob_container_client->IsClientForDisk())
ProfileEvents::increment(ProfileEvents::DiskAzureStageBlock);
Azure::Core::IO::MemoryBodyStream memory_stream(reinterpret_cast<const uint8_t *>(std::get<1>(*worker_data).memory.data()), data_size);

View File

@ -28,7 +28,7 @@ class TaskTracker;
class WriteBufferFromAzureBlobStorage : public WriteBufferFromFileBase
{
public:
using AzureClientPtr = std::shared_ptr<const Azure::Storage::Blobs::BlobContainerClient>;
using AzureClientPtr = std::shared_ptr<const AzureBlobStorage::ContainerClient>;
WriteBufferFromAzureBlobStorage(
AzureClientPtr blob_container_client_,

View File

@ -45,6 +45,7 @@ namespace Setting
namespace ErrorCodes
{
extern const int BAD_ARGUMENTS;
extern const int LOGICAL_ERROR;
}
namespace AzureBlobStorage
@ -81,6 +82,50 @@ static bool isConnectionString(const std::string & candidate)
return !candidate.starts_with("http");
}
ContainerClientWrapper::ContainerClientWrapper(RawContainerClient client_, String blob_prefix_)
: client(std::move(client_)), blob_prefix(std::move(blob_prefix_))
{
}
BlobClient ContainerClientWrapper::GetBlobClient(const String & blob_name) const
{
return client.GetBlobClient(blob_prefix / blob_name);
}
BlockBlobClient ContainerClientWrapper::GetBlockBlobClient(const String & blob_name) const
{
return client.GetBlockBlobClient(blob_prefix / blob_name);
}
BlobContainerPropertiesRespones ContainerClientWrapper::GetProperties() const
{
return client.GetProperties();
}
ListBlobsPagedResponse ContainerClientWrapper::ListBlobs(const ListBlobsOptions & options) const
{
auto new_options = options;
new_options.Prefix = blob_prefix / options.Prefix.ValueOr("");
auto response = client.ListBlobs(new_options);
auto blob_prefix_str = blob_prefix.empty() ? "" : blob_prefix.string() + "/";
for (auto & blob : response.Blobs)
{
if (!blob.Name.starts_with(blob_prefix_str))
throw Exception(ErrorCodes::LOGICAL_ERROR, "Expected prefix '{}' in blob name '{}'", blob_prefix_str, blob.Name);
blob.Name = blob.Name.substr(blob_prefix_str.size());
}
return response;
}
bool ContainerClientWrapper::IsClientForDisk() const
{
return client.GetClickhouseOptions().IsClientForDisk;
}
String ConnectionParams::getConnectionURL() const
{
if (std::holds_alternative<ConnectionString>(auth_method))
@ -99,7 +144,7 @@ std::unique_ptr<ServiceClient> ConnectionParams::createForService() const
if constexpr (std::is_same_v<T, ConnectionString>)
return std::make_unique<ServiceClient>(ServiceClient::CreateFromConnectionString(auth.toUnderType(), client_options));
else
return std::make_unique<ServiceClient>(endpoint.getEndpointWithoutContainer(), auth, client_options);
return std::make_unique<ServiceClient>(endpoint.getServiceEndpoint(), auth, client_options);
}, auth_method);
}
@ -108,9 +153,15 @@ std::unique_ptr<ContainerClient> ConnectionParams::createForContainer() const
return std::visit([this]<typename T>(const T & auth)
{
if constexpr (std::is_same_v<T, ConnectionString>)
return std::make_unique<ContainerClient>(ContainerClient::CreateFromConnectionString(auth.toUnderType(), endpoint.container_name, client_options));
{
auto raw_client = RawContainerClient::CreateFromConnectionString(auth.toUnderType(), endpoint.container_name, client_options);
return std::make_unique<ContainerClient>(std::move(raw_client), endpoint.prefix);
}
else
return std::make_unique<ContainerClient>(endpoint.getEndpoint(), auth, client_options);
{
RawContainerClient raw_client{endpoint.getContainerEndpoint(), auth, client_options};
return std::make_unique<ContainerClient>(std::move(raw_client), endpoint.prefix);
}
}, auth_method);
}
@ -245,7 +296,7 @@ void processURL(const String & url, const String & container_name, Endpoint & en
static bool containerExists(const ContainerClient & client)
{
ProfileEvents::increment(ProfileEvents::AzureGetProperties);
if (client.GetClickhouseOptions().IsClientForDisk)
if (client.IsClientForDisk())
ProfileEvents::increment(ProfileEvents::DiskAzureGetProperties);
try
@ -283,7 +334,8 @@ std::unique_ptr<ContainerClient> getContainerClient(const ConnectionParams & par
if (params.client_options.ClickhouseOptions.IsClientForDisk)
ProfileEvents::increment(ProfileEvents::DiskAzureCreateContainer);
return std::make_unique<ContainerClient>(service_client->CreateBlobContainer(params.endpoint.container_name).Value);
auto raw_client = service_client->CreateBlobContainer(params.endpoint.container_name).Value;
return std::make_unique<ContainerClient>(std::move(raw_client), params.endpoint.prefix);
}
catch (const Azure::Storage::StorageException & e)
{

View File

@ -14,6 +14,9 @@
#include <Poco/Util/AbstractConfiguration.h>
#include <Interpreters/Context_fwd.h>
#include <base/strong_typedef.h>
#include <filesystem>
namespace fs = std::filesystem;
namespace DB
{
@ -23,11 +26,6 @@ struct Settings;
namespace AzureBlobStorage
{
using ServiceClient = Azure::Storage::Blobs::BlobServiceClient;
using ContainerClient = Azure::Storage::Blobs::BlobContainerClient;
using BlobClient = Azure::Storage::Blobs::BlobClient;
using BlobClientOptions = Azure::Storage::Blobs::BlobClientOptions;
struct RequestSettings
{
RequestSettings() = default;
@ -65,7 +63,7 @@ struct Endpoint
String sas_auth;
std::optional<bool> container_already_exists;
String getEndpoint() const
String getContainerEndpoint() const
{
String url = storage_account_url;
if (url.ends_with('/'))
@ -77,16 +75,13 @@ struct Endpoint
if (!container_name.empty())
url += "/" + container_name;
if (!prefix.empty())
url += "/" + prefix;
if (!sas_auth.empty())
url += "?" + sas_auth;
return url;
}
String getEndpointWithoutContainer() const
String getServiceEndpoint() const
{
String url = storage_account_url;
@ -100,6 +95,35 @@ struct Endpoint
}
};
using BlobClient = Azure::Storage::Blobs::BlobClient;
using BlockBlobClient = Azure::Storage::Blobs::BlockBlobClient;
using RawContainerClient = Azure::Storage::Blobs::BlobContainerClient;
using Azure::Storage::Blobs::ListBlobsOptions;
using Azure::Storage::Blobs::ListBlobsPagedResponse;
using BlobContainerPropertiesRespones = Azure::Response<Azure::Storage::Blobs::Models::BlobContainerProperties>;
/// A wrapper for ContainerClient that correctly handles the prefix of blobs.
/// See AzureBlobStorageEndpoint and processAzureBlobStorageEndpoint for details.
class ContainerClientWrapper
{
public:
ContainerClientWrapper(RawContainerClient client_, String blob_prefix_);
bool IsClientForDisk() const;
BlobClient GetBlobClient(const String & blob_name) const;
BlockBlobClient GetBlockBlobClient(const String & blob_name) const;
BlobContainerPropertiesRespones GetProperties() const;
ListBlobsPagedResponse ListBlobs(const ListBlobsOptions & options) const;
private:
RawContainerClient client;
fs::path blob_prefix;
};
using ContainerClient = ContainerClientWrapper;
using ServiceClient = Azure::Storage::Blobs::BlobServiceClient;
using BlobClientOptions = Azure::Storage::Blobs::BlobClientOptions;
using ConnectionString = StrongTypedef<String, struct ConnectionStringTag>;
using AuthMethod = std::variant<

View File

@ -13,7 +13,6 @@
#include <Disks/ObjectStorages/AzureBlobStorage/AzureBlobStorageCommon.h>
#include <Disks/ObjectStorages/ObjectStorageIteratorAsync.h>
#include <Interpreters/Context.h>
#include <Common/logger_useful.h>
namespace CurrentMetrics
@ -52,7 +51,7 @@ class AzureIteratorAsync final : public IObjectStorageIteratorAsync
public:
AzureIteratorAsync(
const std::string & path_prefix,
std::shared_ptr<const Azure::Storage::Blobs::BlobContainerClient> client_,
std::shared_ptr<const AzureBlobStorage::ContainerClient> client_,
size_t max_list_size)
: IObjectStorageIteratorAsync(
CurrentMetrics::ObjectStorageAzureThreads,
@ -69,7 +68,7 @@ private:
bool getBatchAndCheckNext(RelativePathsWithMetadata & batch) override
{
ProfileEvents::increment(ProfileEvents::AzureListObjects);
if (client->GetClickhouseOptions().IsClientForDisk)
if (client->IsClientForDisk())
ProfileEvents::increment(ProfileEvents::DiskAzureListObjects);
batch.clear();
@ -97,7 +96,7 @@ private:
return true;
}
std::shared_ptr<const Azure::Storage::Blobs::BlobContainerClient> client;
std::shared_ptr<const AzureBlobStorage::ContainerClient> client;
Azure::Storage::Blobs::ListBlobsOptions options;
};
@ -130,7 +129,7 @@ bool AzureObjectStorage::exists(const StoredObject & object) const
auto client_ptr = client.get();
ProfileEvents::increment(ProfileEvents::AzureGetProperties);
if (client_ptr->GetClickhouseOptions().IsClientForDisk)
if (client_ptr->IsClientForDisk())
ProfileEvents::increment(ProfileEvents::DiskAzureGetProperties);
try
@ -159,9 +158,6 @@ void AzureObjectStorage::listObjects(const std::string & path, RelativePathsWith
{
auto client_ptr = client.get();
/// NOTE: list doesn't work if endpoint contains non-empty prefix for blobs.
/// See AzureBlobStorageEndpoint and processAzureBlobStorageEndpoint for details.
Azure::Storage::Blobs::ListBlobsOptions options;
options.Prefix = path;
if (max_keys)
@ -172,7 +168,7 @@ void AzureObjectStorage::listObjects(const std::string & path, RelativePathsWith
for (auto blob_list_response = client_ptr->ListBlobs(options); blob_list_response.HasPage(); blob_list_response.MoveToNextPage())
{
ProfileEvents::increment(ProfileEvents::AzureListObjects);
if (client_ptr->GetClickhouseOptions().IsClientForDisk)
if (client_ptr->IsClientForDisk())
ProfileEvents::increment(ProfileEvents::DiskAzureListObjects);
blob_list_response = client_ptr->ListBlobs(options);
@ -246,10 +242,13 @@ std::unique_ptr<WriteBufferFromFileBase> AzureObjectStorage::writeObject( /// NO
std::move(scheduler));
}
void AzureObjectStorage::removeObjectImpl(const StoredObject & object, const SharedAzureClientPtr & client_ptr, bool if_exists)
void AzureObjectStorage::removeObjectImpl(
const StoredObject & object,
const std::shared_ptr<const AzureBlobStorage::ContainerClient> & client_ptr,
bool if_exists)
{
ProfileEvents::increment(ProfileEvents::AzureDeleteObjects);
if (client_ptr->GetClickhouseOptions().IsClientForDisk)
if (client_ptr->IsClientForDisk())
ProfileEvents::increment(ProfileEvents::DiskAzureDeleteObjects);
const auto & path = object.remote_path;
@ -257,7 +256,7 @@ void AzureObjectStorage::removeObjectImpl(const StoredObject & object, const Sha
try
{
auto delete_info = client_ptr->DeleteBlob(path);
auto delete_info = client_ptr->GetBlobClient(path).Delete();
if (!if_exists && !delete_info.Value.Deleted)
throw Exception(
ErrorCodes::AZURE_BLOB_STORAGE_ERROR, "Failed to delete file (path: {}) in AzureBlob Storage, reason: {}",
@ -268,7 +267,7 @@ void AzureObjectStorage::removeObjectImpl(const StoredObject & object, const Sha
if (!if_exists)
throw;
/// If object doesn't exist...
/// If object doesn't exist.
if (e.StatusCode == Azure::Core::Http::HttpStatusCode::NotFound)
return;
@ -298,7 +297,7 @@ ObjectMetadata AzureObjectStorage::getObjectMetadata(const std::string & path) c
auto properties = blob_client.GetProperties().Value;
ProfileEvents::increment(ProfileEvents::AzureGetProperties);
if (client_ptr->GetClickhouseOptions().IsClientForDisk)
if (client_ptr->IsClientForDisk())
ProfileEvents::increment(ProfileEvents::DiskAzureGetProperties);
ObjectMetadata result;
@ -332,7 +331,7 @@ void AzureObjectStorage::copyObject( /// NOLINT
}
ProfileEvents::increment(ProfileEvents::AzureCopyObject);
if (client_ptr->GetClickhouseOptions().IsClientForDisk)
if (client_ptr->IsClientForDisk())
ProfileEvents::increment(ProfileEvents::DiskAzureCopyObject);
dest_blob_client.CopyFromUri(source_blob_client.GetUrl(), copy_options);
@ -350,7 +349,7 @@ void AzureObjectStorage::applyNewSettings(
if (!options.allow_client_change)
return;
bool is_client_for_disk = client.get()->GetClickhouseOptions().IsClientForDisk;
bool is_client_for_disk = client.get()->IsClientForDisk();
AzureBlobStorage::ConnectionParams params
{
@ -371,7 +370,7 @@ std::unique_ptr<IObjectStorage> AzureObjectStorage::cloneObjectStorage(
ContextPtr context)
{
auto new_settings = AzureBlobStorage::getRequestSettings(config, config_prefix, context);
bool is_client_for_disk = client.get()->GetClickhouseOptions().IsClientForDisk;
bool is_client_for_disk = client.get()->IsClientForDisk();
AzureBlobStorage::ConnectionParams params
{
@ -381,7 +380,7 @@ std::unique_ptr<IObjectStorage> AzureObjectStorage::cloneObjectStorage(
};
auto new_client = AzureBlobStorage::getContainerClient(params, /*readonly=*/ true);
return std::make_unique<AzureObjectStorage>(name, std::move(new_client), std::move(new_settings), new_namespace, params.endpoint.getEndpointWithoutContainer());
return std::make_unique<AzureObjectStorage>(name, std::move(new_client), std::move(new_settings), new_namespace, params.endpoint.getServiceEndpoint());
}
}

View File

@ -100,8 +100,10 @@ public:
bool supportParallelWrite() const override { return true; }
private:
using SharedAzureClientPtr = std::shared_ptr<const Azure::Storage::Blobs::BlobContainerClient>;
void removeObjectImpl(const StoredObject & object, const SharedAzureClientPtr & client_ptr, bool if_exists);
void removeObjectImpl(
const StoredObject & object,
const std::shared_ptr<const AzureBlobStorage::ContainerClient> & client_ptr,
bool if_exists);
const String name;
/// client used to access the files in the Blob Storage cloud

View File

@ -118,7 +118,7 @@ public:
const FileCacheSettings & getCacheSettings() const { return cache_settings; }
#if USE_AZURE_BLOB_STORAGE
std::shared_ptr<const Azure::Storage::Blobs::BlobContainerClient> getAzureBlobStorageClient() const override
std::shared_ptr<const AzureBlobStorage::ContainerClient> getAzureBlobStorageClient() const override
{
return object_storage->getAzureBlobStorageClient();
}

View File

@ -28,8 +28,7 @@
#include "config.h"
#if USE_AZURE_BLOB_STORAGE
#include <Common/MultiVersion.h>
#include <azure/storage/blobs.hpp>
#include <Disks/ObjectStorages/AzureBlobStorage/AzureBlobStorageCommon.h>
#endif
#if USE_AWS_S3
@ -256,7 +255,7 @@ public:
virtual void setKeysGenerator(ObjectStorageKeysGeneratorPtr) { }
#if USE_AZURE_BLOB_STORAGE
virtual std::shared_ptr<const Azure::Storage::Blobs::BlobContainerClient> getAzureBlobStorageClient() const
virtual std::shared_ptr<const AzureBlobStorage::ContainerClient> getAzureBlobStorageClient() const
{
throw Exception(ErrorCodes::NOT_IMPLEMENTED, "This function is only implemented for AzureBlobStorage");
}

View File

@ -12,12 +12,6 @@
#include <Common/filesystemHelpers.h>
#include <filesystem>
#include <memory>
#include <optional>
#include <tuple>
#include <unordered_set>
#include <Poco/Timestamp.h>
namespace DB
{

View File

@ -307,7 +307,7 @@ void registerAzureObjectStorage(ObjectStorageFactory & factory)
ObjectStorageType::Azure, config, config_prefix, name,
AzureBlobStorage::getContainerClient(params, /*readonly=*/ false), std::move(azure_settings),
params.endpoint.prefix.empty() ? params.endpoint.container_name : params.endpoint.container_name + "/" + params.endpoint.prefix,
params.endpoint.getEndpointWithoutContainer());
params.endpoint.getServiceEndpoint());
};
factory.registerObjectStorageType("azure_blob_storage", creator);

View File

@ -3,6 +3,7 @@
#include <Columns/ColumnString.h>
#include <Columns/ColumnVector.h>
#include <Columns/ColumnsNumber.h>
#include <Core/UUID.h>
#include <DataTypes/DataTypeString.h>
#include <Functions/FunctionFactory.h>
#include <Functions/IFunction.h>

View File

@ -42,7 +42,7 @@ namespace
public:
UploadHelper(
const CreateReadBuffer & create_read_buffer_,
std::shared_ptr<const Azure::Storage::Blobs::BlobContainerClient> client_,
std::shared_ptr<const AzureBlobStorage::ContainerClient> client_,
size_t offset_,
size_t total_size_,
const String & dest_container_for_logging_,
@ -67,7 +67,7 @@ namespace
protected:
std::function<std::unique_ptr<SeekableReadBuffer>()> create_read_buffer;
std::shared_ptr<const Azure::Storage::Blobs::BlobContainerClient> client;
std::shared_ptr<const AzureBlobStorage::ContainerClient> client;
size_t offset;
size_t total_size;
const String & dest_container_for_logging;
@ -159,7 +159,7 @@ namespace
{
auto block_blob_client = client->GetBlockBlobClient(dest_blob);
ProfileEvents::increment(ProfileEvents::AzureCommitBlockList);
if (client->GetClickhouseOptions().IsClientForDisk)
if (client->IsClientForDisk())
ProfileEvents::increment(ProfileEvents::DiskAzureCommitBlockList);
block_blob_client.CommitBlockList(block_ids);
@ -271,7 +271,7 @@ namespace
void processUploadPartRequest(UploadPartTask & task)
{
ProfileEvents::increment(ProfileEvents::AzureStageBlock);
if (client->GetClickhouseOptions().IsClientForDisk)
if (client->IsClientForDisk())
ProfileEvents::increment(ProfileEvents::DiskAzureStageBlock);
auto block_blob_client = client->GetBlockBlobClient(dest_blob);
@ -322,7 +322,7 @@ void copyDataToAzureBlobStorageFile(
const std::function<std::unique_ptr<SeekableReadBuffer>()> & create_read_buffer,
size_t offset,
size_t size,
std::shared_ptr<const Azure::Storage::Blobs::BlobContainerClient> dest_client,
std::shared_ptr<const AzureBlobStorage::ContainerClient> dest_client,
const String & dest_container_for_logging,
const String & dest_blob,
std::shared_ptr<const AzureBlobStorage::RequestSettings> settings,
@ -335,8 +335,8 @@ void copyDataToAzureBlobStorageFile(
void copyAzureBlobStorageFile(
std::shared_ptr<const Azure::Storage::Blobs::BlobContainerClient> src_client,
std::shared_ptr<const Azure::Storage::Blobs::BlobContainerClient> dest_client,
std::shared_ptr<const AzureBlobStorage::ContainerClient> src_client,
std::shared_ptr<const AzureBlobStorage::ContainerClient> dest_client,
const String & src_container_for_logging,
const String & src_blob,
size_t offset,
@ -353,7 +353,7 @@ void copyAzureBlobStorageFile(
{
LOG_TRACE(log, "Copying Blob: {} from Container: {} using native copy", src_container_for_logging, src_blob);
ProfileEvents::increment(ProfileEvents::AzureCopyObject);
if (dest_client->GetClickhouseOptions().IsClientForDisk)
if (dest_client->IsClientForDisk())
ProfileEvents::increment(ProfileEvents::DiskAzureCopyObject);
auto block_blob_client_src = src_client->GetBlockBlobClient(src_blob);

View File

@ -20,8 +20,8 @@ using CreateReadBuffer = std::function<std::unique_ptr<SeekableReadBuffer>()>;
/// Copies a file from AzureBlobStorage to AzureBlobStorage.
/// The parameters `src_offset` and `src_size` specify a part in the source to copy.
void copyAzureBlobStorageFile(
std::shared_ptr<const Azure::Storage::Blobs::BlobContainerClient> src_client,
std::shared_ptr<const Azure::Storage::Blobs::BlobContainerClient> dest_client,
std::shared_ptr<const AzureBlobStorage::ContainerClient> src_client,
std::shared_ptr<const AzureBlobStorage::ContainerClient> dest_client,
const String & src_container_for_logging,
const String & src_blob,
size_t src_offset,
@ -42,7 +42,7 @@ void copyDataToAzureBlobStorageFile(
const std::function<std::unique_ptr<SeekableReadBuffer>()> & create_read_buffer,
size_t offset,
size_t size,
std::shared_ptr<const Azure::Storage::Blobs::BlobContainerClient> client,
std::shared_ptr<const AzureBlobStorage::ContainerClient> client,
const String & dest_container_for_logging,
const String & dest_blob,
std::shared_ptr<const AzureBlobStorage::RequestSettings> settings,

View File

@ -1,13 +1,22 @@
#include <IO/WriteHelpers.h>
#include <cinttypes>
#include <utility>
#include <Common/formatIPv6.h>
#include <base/DecomposedFloat.h>
#include <base/hex.h>
#include <Common/formatIPv6.h>
#pragma clang diagnostic push
#pragma clang diagnostic ignored "-Wunused-parameter"
#pragma clang diagnostic ignored "-Wsign-compare"
#include <dragonbox/dragonbox_to_chars.h>
#pragma clang diagnostic pop
namespace DB
{
namespace ErrorCodes
{
extern const int CANNOT_PRINT_FLOAT_OR_DOUBLE_NUMBER;
}
template <typename IteratorSrc, typename IteratorDst>
void formatHex(IteratorSrc src, IteratorDst dst, size_t num_bytes)
{
@ -127,4 +136,38 @@ String fourSpaceIndent(size_t indent)
{
return std::string(indent * 4, ' ');
}
template <typename T>
requires is_floating_point<T>
size_t writeFloatTextFastPath(T x, char * buffer)
{
Int64 result = 0;
if constexpr (std::is_same_v<T, Float64>)
{
/// The library dragonbox has low performance on integers.
/// This workaround improves performance 6..10 times.
if (DecomposedFloat64(x).isIntegerInRepresentableRange())
result = itoa(Int64(x), buffer) - buffer;
else
result = jkj::dragonbox::to_chars_n(x, buffer) - buffer;
}
else if constexpr (std::is_same_v<T, Float32> || std::is_same_v<T, BFloat16>)
{
Float32 f32 = Float32(x);
if (DecomposedFloat32(f32).isIntegerInRepresentableRange())
result = itoa(Int32(f32), buffer) - buffer;
else
result = jkj::dragonbox::to_chars_n(f32, buffer) - buffer;
}
if (result <= 0)
throw Exception(ErrorCodes::CANNOT_PRINT_FLOAT_OR_DOUBLE_NUMBER, "Cannot print floating point number");
return result;
}
template size_t writeFloatTextFastPath(Float64 x, char * buffer);
template size_t writeFloatTextFastPath(Float32 x, char * buffer);
template size_t writeFloatTextFastPath(BFloat16 x, char * buffer);
}

View File

@ -4,8 +4,6 @@
#include <cstdio>
#include <limits>
#include <algorithm>
#include <iterator>
#include <concepts>
#include <bit>
#include <pcg-random/pcg_random.hpp>
@ -18,19 +16,14 @@
#include <Common/transformEndianness.h>
#include <base/find_symbols.h>
#include <base/StringRef.h>
#include <base/DecomposedFloat.h>
#include <Core/DecimalFunctions.h>
#include <Core/Types.h>
#include <Core/UUID.h>
#include <base/IPv4andIPv6.h>
#include <Common/Exception.h>
#include <Common/StringUtils.h>
#include <Common/NaNUtils.h>
#include <Common/typeid_cast.h>
#include <IO/CompressionMethod.h>
#include <IO/WriteBuffer.h>
#include <IO/WriteIntText.h>
#include <IO/VarInt.h>
@ -38,24 +31,11 @@
#include <IO/WriteBufferFromString.h>
#include <IO/WriteBufferFromFileDescriptor.h>
#pragma clang diagnostic push
#pragma clang diagnostic ignored "-Wunused-parameter"
#pragma clang diagnostic ignored "-Wsign-compare"
#include <dragonbox/dragonbox_to_chars.h>
#pragma clang diagnostic pop
#include <Formats/FormatSettings.h>
namespace DB
{
namespace ErrorCodes
{
extern const int CANNOT_PRINT_FLOAT_OR_DOUBLE_NUMBER;
}
/// Helper functions for formatted and binary output.
inline void writeChar(char x, WriteBuffer & buf)
@ -151,41 +131,11 @@ inline void writeBoolText(bool x, WriteBuffer & buf)
template <typename T>
requires is_floating_point<T>
inline size_t writeFloatTextFastPath(T x, char * buffer)
{
Int64 result = 0;
size_t writeFloatTextFastPath(T x, char * buffer);
if constexpr (std::is_same_v<T, Float64>)
{
/// The library Ryu has low performance on integers.
/// This workaround improves performance 6..10 times.
if (DecomposedFloat64(x).isIntegerInRepresentableRange())
result = itoa(Int64(x), buffer) - buffer;
else
result = jkj::dragonbox::to_chars_n(x, buffer) - buffer;
}
else if constexpr (std::is_same_v<T, Float32>)
{
if (DecomposedFloat32(x).isIntegerInRepresentableRange())
result = itoa(Int32(x), buffer) - buffer;
else
result = jkj::dragonbox::to_chars_n(x, buffer) - buffer;
}
else if constexpr (std::is_same_v<T, BFloat16>)
{
Float32 f32 = Float32(x);
if (DecomposedFloat32(f32).isIntegerInRepresentableRange())
result = itoa(Int32(f32), buffer) - buffer;
else
result = jkj::dragonbox::to_chars_n(f32, buffer) - buffer;
}
if (result <= 0)
throw Exception(ErrorCodes::CANNOT_PRINT_FLOAT_OR_DOUBLE_NUMBER, "Cannot print floating point number");
return result;
}
extern template size_t writeFloatTextFastPath(Float64 x, char * buffer);
extern template size_t writeFloatTextFastPath(Float32 x, char * buffer);
extern template size_t writeFloatTextFastPath(BFloat16 x, char * buffer);
template <typename T>
requires is_floating_point<T>

View File

@ -405,8 +405,8 @@ ASTPtr parseAdditionalFilterConditionForTable(
for (const auto & additional_filter : additional_table_filters)
{
const auto & tuple = additional_filter.safeGet<const Tuple &>();
auto & table = tuple.at(0).safeGet<String>();
auto & filter = tuple.at(1).safeGet<String>();
const auto & table = tuple.at(0).safeGet<String>();
const auto & filter = tuple.at(1).safeGet<String>();
if (table == target.alias ||
(table == target.table && context.getCurrentDatabase() == target.database) ||

View File

@ -5,6 +5,7 @@
#include <Interpreters/IJoin.h>
#include <Interpreters/ActionsDAG.h>
#include <Interpreters/ExpressionActions.h>
#include <Core/Joins.h>
namespace DB
{

View File

@ -440,7 +440,7 @@ ColumnPtr Set::execute(const ColumnsWithTypeAndName & columns, bool negative) co
}
// If the original column is DateTime64, check for sub-second precision
if (isDateTime64(column_to_cast.column->getDataType()))
if (isDateTime64(column_to_cast.column->getDataType()) && !isDateTime64(removeNullable(result)->getDataType()))
{
processDateTime64Column(column_to_cast, result, null_map_holder, null_map);
}

View File

@ -1540,7 +1540,7 @@ std::pair<ASTPtr, BlockIO> executeQuery(
? getIdentifierName(ast_query_with_output->format)
: context->getDefaultFormat();
if (format_name == "Null")
if (boost::iequals(format_name, "Null"))
res.null_format = true;
}

View File

@ -70,6 +70,9 @@
#include <Planner/CollectSets.h>
#include <Planner/CollectTableExpressionData.h>
#include <Common/logger_useful.h>
#include <ranges>
namespace DB
{
namespace Setting
@ -1355,6 +1358,9 @@ JoinTreeQueryPlan buildQueryPlanForJoinNode(
if (join_strictness == JoinStrictness::All || join_strictness == JoinStrictness::Semi || join_strictness == JoinStrictness::Anti)
join_constant = tryExtractConstantFromJoinNode(join_table_expression);
bool can_move_out_residuals = false;
if (!join_constant && join_node.isOnJoinExpression())
{
join_clauses_and_actions = buildJoinClausesAndActions(left_plan_output_columns,
@ -1362,17 +1368,39 @@ JoinTreeQueryPlan buildQueryPlanForJoinNode(
join_table_expression,
planner_context);
join_clauses_and_actions.left_join_expressions_actions.appendInputsForUnusedColumns(left_plan.getCurrentHeader());
auto left_join_expressions_actions_step = std::make_unique<ExpressionStep>(left_plan.getCurrentHeader(), std::move(join_clauses_and_actions.left_join_expressions_actions));
left_join_expressions_actions_step->setStepDescription("JOIN actions");
appendSetsFromActionsDAG(left_join_expressions_actions_step->getExpression(), left_join_tree_query_plan.useful_sets);
left_plan.addStep(std::move(left_join_expressions_actions_step));
const auto & left_pre_filters = join_clauses_and_actions.join_clauses[0].getLeftFilterConditionNodes();
const auto & right_pre_filters = join_clauses_and_actions.join_clauses[0].getRightFilterConditionNodes();
auto check_pre_filter = [](JoinTableSide side, const auto & pre_filters)
{
if (!pre_filters.empty() && pre_filters.size() != 1)
throw Exception(ErrorCodes::LOGICAL_ERROR, "Expected only one {} pre-filter condition node. Actual [{}]",
side, fmt::join(pre_filters | std::views::transform([](const auto & node) { return node->result_name; }), ", "));
};
check_pre_filter(JoinTableSide::Left, left_pre_filters);
check_pre_filter(JoinTableSide::Right, right_pre_filters);
join_clauses_and_actions.right_join_expressions_actions.appendInputsForUnusedColumns(right_plan.getCurrentHeader());
auto right_join_expressions_actions_step = std::make_unique<ExpressionStep>(right_plan.getCurrentHeader(), std::move(join_clauses_and_actions.right_join_expressions_actions));
right_join_expressions_actions_step->setStepDescription("JOIN actions");
appendSetsFromActionsDAG(right_join_expressions_actions_step->getExpression(), right_join_tree_query_plan.useful_sets);
right_plan.addStep(std::move(right_join_expressions_actions_step));
can_move_out_residuals = join_clauses_and_actions.join_clauses.size() == 1
&& join_strictness == JoinStrictness::All
&& (join_kind == JoinKind::Inner || join_kind == JoinKind::Cross || join_kind == JoinKind::Comma)
&& (right_pre_filters.empty() || FilterStep::canUseType(right_pre_filters[0]->result_type))
&& (left_pre_filters.empty() || FilterStep::canUseType(left_pre_filters[0]->result_type));
auto add_pre_filter = [can_move_out_residuals](ActionsDAG & join_expressions_actions, QueryPlan & plan, UsefulSets & useful_sets, const auto & pre_filters)
{
join_expressions_actions.appendInputsForUnusedColumns(plan.getCurrentHeader());
appendSetsFromActionsDAG(join_expressions_actions, useful_sets);
QueryPlanStepPtr join_expressions_actions_step;
if (can_move_out_residuals && !pre_filters.empty())
join_expressions_actions_step = std::make_unique<FilterStep>(plan.getCurrentHeader(), std::move(join_expressions_actions), pre_filters[0]->result_name, false);
else
join_expressions_actions_step = std::make_unique<ExpressionStep>(plan.getCurrentHeader(), std::move(join_expressions_actions));
join_expressions_actions_step->setStepDescription("JOIN actions");
plan.addStep(std::move(join_expressions_actions_step));
};
add_pre_filter(join_clauses_and_actions.left_join_expressions_actions, left_plan, left_join_tree_query_plan.useful_sets, left_pre_filters);
add_pre_filter(join_clauses_and_actions.right_join_expressions_actions, right_plan, right_join_tree_query_plan.useful_sets, right_pre_filters);
}
std::unordered_map<ColumnIdentifier, DataTypePtr> left_plan_column_name_to_cast_type;
@ -1484,12 +1512,11 @@ JoinTreeQueryPlan buildQueryPlanForJoinNode(
const auto & join_clauses = join_clauses_and_actions.join_clauses;
bool is_asof = table_join->strictness() == JoinStrictness::Asof;
if (join_clauses.size() > 1)
if (join_clauses.size() != 1 && is_asof)
{
if (is_asof)
throw Exception(ErrorCodes::NOT_IMPLEMENTED,
"ASOF join {} doesn't support multiple ORs for keys in JOIN ON section",
join_node.formatASTForErrorMessage());
throw Exception(ErrorCodes::NOT_IMPLEMENTED,
"ASOF join doesn't support JOIN ON expression {}",
join_node.formatASTForErrorMessage());
}
auto & table_join_clauses = table_join->getClauses();
@ -1497,7 +1524,6 @@ JoinTreeQueryPlan buildQueryPlanForJoinNode(
for (const auto & join_clause : join_clauses)
{
table_join_clauses.emplace_back();
auto & table_join_clause = table_join_clauses.back();
const auto & join_clause_left_key_nodes = join_clause.getLeftKeyNodes();
const auto & join_clause_right_key_nodes = join_clause.getRightKeyNodes();
@ -1505,6 +1531,22 @@ JoinTreeQueryPlan buildQueryPlanForJoinNode(
size_t join_clause_key_nodes_size = join_clause_left_key_nodes.size();
chassert(join_clause_key_nodes_size == join_clause_right_key_nodes.size());
if (join_clause_key_nodes_size == 0 && !can_move_out_residuals)
throw Exception(ErrorCodes::INVALID_JOIN_ON_EXPRESSION, "Cannot determine join keys in {}", join_node.formatASTForErrorMessage());
/// If there are no keys, but only conditions that cannot be used as keys, then it is a cross join.
/// Example: SELECT * FROM t1 JOIN t2 ON t1.x > t2.y
/// Same as: SELECT * FROM t1 CROSS JOIN t2 WHERE t1.x > t2.y
if (join_clause_key_nodes_size == 0 && can_move_out_residuals)
{
join_kind = JoinKind::Cross;
table_join->getTableJoin().kind = join_kind;
table_join->setIsJoinWithConstant(true);
table_join_clauses.pop_back();
continue;
}
auto & table_join_clause = table_join_clauses.back();
for (size_t i = 0; i < join_clause_key_nodes_size; ++i)
{
table_join_clause.addKey(join_clause_left_key_nodes[i]->result_name,
@ -1521,8 +1563,11 @@ JoinTreeQueryPlan buildQueryPlanForJoinNode(
join_node.formatASTForErrorMessage(),
join_clause_get_left_filter_condition_nodes.size());
const auto & join_clause_left_filter_condition_name = join_clause_get_left_filter_condition_nodes[0]->result_name;
table_join_clause.analyzer_left_filter_condition_column_name = join_clause_left_filter_condition_name;
if (!can_move_out_residuals)
{
const auto & join_clause_left_filter_condition_name = join_clause_get_left_filter_condition_nodes[0]->result_name;
table_join_clause.analyzer_left_filter_condition_column_name = join_clause_left_filter_condition_name;
}
}
const auto & join_clause_get_right_filter_condition_nodes = join_clause.getRightFilterConditionNodes();
@ -1534,8 +1579,11 @@ JoinTreeQueryPlan buildQueryPlanForJoinNode(
join_node.formatASTForErrorMessage(),
join_clause_get_right_filter_condition_nodes.size());
const auto & join_clause_right_filter_condition_name = join_clause_get_right_filter_condition_nodes[0]->result_name;
table_join_clause.analyzer_right_filter_condition_column_name = join_clause_right_filter_condition_name;
if (!can_move_out_residuals)
{
const auto & join_clause_right_filter_condition_name = join_clause_get_right_filter_condition_nodes[0]->result_name;
table_join_clause.analyzer_right_filter_condition_column_name = join_clause_right_filter_condition_name;
}
}
if (is_asof)
@ -1560,14 +1608,16 @@ JoinTreeQueryPlan buildQueryPlanForJoinNode(
}
}
if (join_clauses_and_actions.mixed_join_expressions_actions)
if (!can_move_out_residuals && join_clauses_and_actions.residual_join_expressions_actions)
{
/// Let join algorithm handle residual conditions
ExpressionActionsPtr & mixed_join_expression = table_join->getMixedJoinExpression();
mixed_join_expression = std::make_shared<ExpressionActions>(
std::move(*join_clauses_and_actions.mixed_join_expressions_actions),
std::move(*join_clauses_and_actions.residual_join_expressions_actions),
ExpressionActionsSettings::fromContext(planner_context->getQueryContext()));
appendSetsFromActionsDAG(mixed_join_expression->getActionsDAG(), left_join_tree_query_plan.useful_sets);
join_clauses_and_actions.residual_join_expressions_actions.reset();
}
}
else if (join_node.isUsingJoinExpression())
@ -1617,7 +1667,6 @@ JoinTreeQueryPlan buildQueryPlanForJoinNode(
table_join->setUsedColumn(column_from_joined_table, JoinTableSide::Right);
}
if (table_join->getOutputColumns(JoinTableSide::Left).empty() && table_join->getOutputColumns(JoinTableSide::Right).empty())
{
/// We should add all duplicated columns, because join algorithm add either all column with specified name or none
@ -1722,13 +1771,20 @@ JoinTreeQueryPlan buildQueryPlanForJoinNode(
auto join_pipeline_type = join_algorithm->pipelineType();
ColumnIdentifierSet outer_scope_columns_nonempty;
if (outer_scope_columns.empty())
ColumnIdentifierSet required_columns_after_join = outer_scope_columns;
if (join_clauses_and_actions.residual_join_expressions_actions)
{
for (const auto * input : join_clauses_and_actions.residual_join_expressions_actions->getInputs())
required_columns_after_join.insert(input->result_name);
}
if (required_columns_after_join.empty())
{
if (left_header.columns() > 1)
outer_scope_columns_nonempty.insert(left_header.getByPosition(0).name);
required_columns_after_join.insert(left_header.getByPosition(0).name);
else if (right_header.columns() > 1)
outer_scope_columns_nonempty.insert(right_header.getByPosition(0).name);
required_columns_after_join.insert(right_header.getByPosition(0).name);
}
auto join_step = std::make_unique<JoinStep>(
@ -1738,7 +1794,7 @@ JoinTreeQueryPlan buildQueryPlanForJoinNode(
settings[Setting::max_block_size],
settings[Setting::min_joined_block_size_bytes],
settings[Setting::max_threads],
outer_scope_columns.empty() ? outer_scope_columns_nonempty : outer_scope_columns,
required_columns_after_join,
false /*optimize_read_in_order*/,
true /*optimize_skip_unused_shards*/);
@ -1753,6 +1809,30 @@ JoinTreeQueryPlan buildQueryPlanForJoinNode(
result_plan.unitePlans(std::move(join_step), {std::move(plans)});
}
/// If residuals were not moved to JOIN algorithm,
/// we need to process add then as WHERE condition after JOIN
if (join_clauses_and_actions.residual_join_expressions_actions)
{
auto outputs = join_clauses_and_actions.residual_join_expressions_actions->getOutputs();
if (outputs.size() != 1)
throw Exception(ErrorCodes::LOGICAL_ERROR, "Expected 1 output column in JOIN actions, got {}",
join_clauses_and_actions.residual_join_expressions_actions->dumpDAG());
join_clauses_and_actions.residual_join_expressions_actions->appendInputsForUnusedColumns(result_plan.getCurrentHeader());
for (const auto * input_node : join_clauses_and_actions.residual_join_expressions_actions->getInputs())
join_clauses_and_actions.residual_join_expressions_actions->addOrReplaceInOutputs(*input_node);
appendSetsFromActionsDAG(*join_clauses_and_actions.residual_join_expressions_actions, left_join_tree_query_plan.useful_sets);
auto filter_step = std::make_unique<FilterStep>(result_plan.getCurrentHeader(),
std::move(*join_clauses_and_actions.residual_join_expressions_actions),
outputs[0]->result_name,
/* remove_column = */ false); /// Unused columns will be removed by next step
filter_step->setStepDescription("Residual JOIN filter");
result_plan.addStep(std::move(filter_step));
join_clauses_and_actions.residual_join_expressions_actions.reset();
}
const auto & header_after_join = result_plan.getCurrentHeader();
if (header_after_join.columns() > outer_scope_columns.size())
{

View File

@ -209,7 +209,7 @@ const ActionsDAG::Node * appendExpression(
void buildJoinClause(
ActionsDAG & left_dag,
ActionsDAG & right_dag,
ActionsDAG & mixed_dag,
ActionsDAG & joined_dag,
const PlannerContextPtr & planner_context,
const QueryTreeNodePtr & join_expression,
const TableExpressionSet & left_table_expressions,
@ -230,7 +230,7 @@ void buildJoinClause(
buildJoinClause(
left_dag,
right_dag,
mixed_dag,
joined_dag,
planner_context,
child,
left_table_expressions,
@ -334,8 +334,8 @@ void buildJoinClause(
{
/// expression involves both tables.
/// `expr1(left.col1, right.col2) == expr2(left.col3, right.col4)`
const auto * node = appendExpression(mixed_dag, join_expression, planner_context, join_node);
join_clause.addMixedCondition(node);
const auto * node = appendExpression(joined_dag, join_expression, planner_context, join_node);
join_clause.addResidualCondition(node);
}
else
{
@ -363,21 +363,23 @@ void buildJoinClause(
}
else
{
auto support_mixed_join_condition = planner_context->getQueryContext()->getSettingsRef()[Setting::allow_experimental_join_condition];
auto join_use_nulls = planner_context->getQueryContext()->getSettingsRef()[Setting::join_use_nulls];
/// If join_use_nulls = true, the columns' nullability will be changed later which make this expression not right.
if (support_mixed_join_condition && !join_use_nulls)
/// If join_use_nulls = true, the columns' nullability will be changed later which make this expression not applicable.
auto strictness = join_node.getStrictness();
auto kind = join_node.getKind();
bool can_be_moved_out = strictness == JoinStrictness::All
&& (kind == JoinKind::Inner || kind == JoinKind::Cross || kind == JoinKind::Comma);
if (can_be_moved_out || !join_use_nulls)
{
/// expression involves both tables.
const auto * node = appendExpression(mixed_dag, join_expression, planner_context, join_node);
join_clause.addMixedCondition(node);
const auto * node = appendExpression(joined_dag, join_expression, planner_context, join_node);
join_clause.addResidualCondition(node);
}
else
{
throw Exception(
ErrorCodes::INVALID_JOIN_ON_EXPRESSION,
"JOIN {} join expression contains column from left and right table, you may try experimental support of this feature by `SET allow_experimental_join_condition = 1`",
join_node.formatASTForErrorMessage());
"{} JOIN ON expression {} contains column from left and right table, which is not supported with `join_use_nulls`",
toString(join_node.getKind()), join_expression->formatASTForErrorMessage());
}
}
}
@ -391,16 +393,16 @@ JoinClausesAndActions buildJoinClausesAndActions(
{
ActionsDAG left_join_actions(left_table_expression_columns);
ActionsDAG right_join_actions(right_table_expression_columns);
ColumnsWithTypeAndName mixed_table_expression_columns;
ColumnsWithTypeAndName result_relation_columns;
for (const auto & left_column : left_table_expression_columns)
{
mixed_table_expression_columns.push_back(left_column);
result_relation_columns.push_back(left_column);
}
for (const auto & right_column : right_table_expression_columns)
{
mixed_table_expression_columns.push_back(right_column);
result_relation_columns.push_back(right_column);
}
ActionsDAG mixed_join_actions(mixed_table_expression_columns);
ActionsDAG post_join_actions(result_relation_columns);
/** It is possible to have constant value in JOIN ON section, that we need to ignore during DAG construction.
* If we do not ignore it, this function will be replaced by underlying constant.
@ -454,7 +456,7 @@ JoinClausesAndActions buildJoinClausesAndActions(
JoinClausesAndActions result;
bool is_inequal_join = false;
bool has_residual_filters = false;
const auto & function_name = function_node->getFunction()->getName();
if (function_name == "or")
{
@ -465,14 +467,14 @@ JoinClausesAndActions buildJoinClausesAndActions(
buildJoinClause(
left_join_actions,
right_join_actions,
mixed_join_actions,
post_join_actions,
planner_context,
child,
join_left_table_expressions,
join_right_table_expressions,
join_node,
result.join_clauses.back());
is_inequal_join |= !result.join_clauses.back().getMixedFilterConditionNodes().empty();
has_residual_filters |= !result.join_clauses.back().getResidualFilterConditionNodes().empty();
}
}
else
@ -482,14 +484,14 @@ JoinClausesAndActions buildJoinClausesAndActions(
buildJoinClause(
left_join_actions,
right_join_actions,
mixed_join_actions,
post_join_actions,
planner_context,
join_expression,
join_left_table_expressions,
join_right_table_expressions,
join_node,
result.join_clauses.back());
is_inequal_join |= !result.join_clauses.back().getMixedFilterConditionNodes().empty();
has_residual_filters |= !result.join_clauses.back().getResidualFilterConditionNodes().empty();
}
auto and_function = FunctionFactory::instance().get("and", planner_context->getQueryContext());
@ -546,10 +548,6 @@ JoinClausesAndActions buildJoinClausesAndActions(
assert(join_clause.getLeftKeyNodes().size() == join_clause.getRightKeyNodes().size());
size_t join_clause_key_nodes_size = join_clause.getLeftKeyNodes().size();
if (join_clause_key_nodes_size == 0)
throw Exception(ErrorCodes::INVALID_JOIN_ON_EXPRESSION, "Cannot determine join keys in {}",
join_node.formatASTForErrorMessage());
for (size_t i = 0; i < join_clause_key_nodes_size; ++i)
{
auto & left_key_node = join_clause.getLeftKeyNodes()[i];
@ -614,35 +612,35 @@ JoinClausesAndActions buildJoinClausesAndActions(
result.right_join_tmp_expression_actions = std::move(right_join_actions);
result.right_join_expressions_actions.removeUnusedActions(join_right_actions_names);
if (is_inequal_join)
if (has_residual_filters)
{
/// In case of multiple disjuncts and any inequal join condition, we need to build full join on expression actions.
/// So, for each column, we recalculate the value of the whole expression from JOIN ON to check if rows should be joined.
if (result.join_clauses.size() > 1)
{
ActionsDAG mixed_join_expressions_actions(mixed_table_expression_columns);
ActionsDAG residual_join_expressions_actions(result_relation_columns);
PlannerActionsVisitor join_expression_visitor(planner_context);
auto join_expression_dag_node_raw_pointers = join_expression_visitor.visit(mixed_join_expressions_actions, join_expression);
auto join_expression_dag_node_raw_pointers = join_expression_visitor.visit(residual_join_expressions_actions, join_expression);
if (join_expression_dag_node_raw_pointers.size() != 1)
throw Exception(
ErrorCodes::LOGICAL_ERROR, "JOIN {} ON clause contains multiple expressions", join_node.formatASTForErrorMessage());
mixed_join_expressions_actions.addOrReplaceInOutputs(*join_expression_dag_node_raw_pointers[0]);
residual_join_expressions_actions.addOrReplaceInOutputs(*join_expression_dag_node_raw_pointers[0]);
Names required_names{join_expression_dag_node_raw_pointers[0]->result_name};
mixed_join_expressions_actions.removeUnusedActions(required_names);
result.mixed_join_expressions_actions = std::move(mixed_join_expressions_actions);
residual_join_expressions_actions.removeUnusedActions(required_names);
result.residual_join_expressions_actions = std::move(residual_join_expressions_actions);
}
else
{
const auto & join_clause = result.join_clauses.front();
const auto & mixed_filter_condition_nodes = join_clause.getMixedFilterConditionNodes();
auto mixed_join_expressions_actions = ActionsDAG::buildFilterActionsDAG(mixed_filter_condition_nodes, {}, true);
result.mixed_join_expressions_actions = std::move(mixed_join_expressions_actions);
const auto & residual_filter_condition_nodes = join_clause.getResidualFilterConditionNodes();
auto residual_join_expressions_actions = ActionsDAG::buildFilterActionsDAG(residual_filter_condition_nodes, {}, true);
result.residual_join_expressions_actions = std::move(residual_join_expressions_actions);
}
auto outputs = result.mixed_join_expressions_actions->getOutputs();
auto outputs = result.residual_join_expressions_actions->getOutputs();
if (outputs.size() != 1)
{
throw Exception(ErrorCodes::LOGICAL_ERROR, "Only one output is expected, got: {}", result.mixed_join_expressions_actions->dumpDAG());
throw Exception(ErrorCodes::LOGICAL_ERROR, "Only one output is expected, got: {}", result.residual_join_expressions_actions->dumpDAG());
}
auto output_type = removeNullable(outputs[0]->result_type);
WhichDataType which_type(output_type);
@ -650,8 +648,8 @@ JoinClausesAndActions buildJoinClausesAndActions(
{
DataTypePtr uint8_ty = std::make_shared<DataTypeUInt8>();
auto true_col = ColumnWithTypeAndName(uint8_ty->createColumnConst(1, 1), uint8_ty, "true");
const auto * true_node = &result.mixed_join_expressions_actions->addColumn(true_col);
result.mixed_join_expressions_actions = ActionsDAG::buildFilterActionsDAG({outputs[0], true_node});
const auto * true_node = &result.residual_join_expressions_actions->addColumn(true_col);
result.residual_join_expressions_actions = ActionsDAG::buildFilterActionsDAG({outputs[0], true_node});
}
}

View File

@ -142,19 +142,19 @@ public:
return right_filter_condition_nodes;
}
ActionsDAG::NodeRawConstPtrs & getMixedFilterConditionNodes()
ActionsDAG::NodeRawConstPtrs & getResidualFilterConditionNodes()
{
return mixed_filter_condition_nodes;
return residual_filter_condition_nodes;
}
void addMixedCondition(const ActionsDAG::Node * condition_node)
void addResidualCondition(const ActionsDAG::Node * condition_node)
{
mixed_filter_condition_nodes.push_back(condition_node);
residual_filter_condition_nodes.push_back(condition_node);
}
const ActionsDAG::NodeRawConstPtrs & getMixedFilterConditionNodes() const
const ActionsDAG::NodeRawConstPtrs & getResidualFilterConditionNodes() const
{
return mixed_filter_condition_nodes;
return residual_filter_condition_nodes;
}
/// Dump clause into buffer
@ -172,7 +172,7 @@ private:
ActionsDAG::NodeRawConstPtrs left_filter_condition_nodes;
ActionsDAG::NodeRawConstPtrs right_filter_condition_nodes;
/// conditions which involve both left and right tables
ActionsDAG::NodeRawConstPtrs mixed_filter_condition_nodes;
ActionsDAG::NodeRawConstPtrs residual_filter_condition_nodes;
std::unordered_set<size_t> nullsafe_compare_key_indexes;
};
@ -192,7 +192,7 @@ struct JoinClausesAndActions
ActionsDAG right_join_expressions_actions;
/// Originally used for inequal join. it's the total join expression.
/// If there is no inequal join conditions, it's null.
std::optional<ActionsDAG> mixed_join_expressions_actions;
std::optional<ActionsDAG> residual_join_expressions_actions;
};
/** Calculate join clauses and actions for JOIN ON section.

View File

@ -248,6 +248,13 @@ void FilterStep::updateOutputHeader()
return;
}
bool FilterStep::canUseType(const DataTypePtr & filter_type)
{
return FilterTransform::canUseType(filter_type);
}
void FilterStep::serialize(Serialization & ctx) const
{
UInt8 flags = 0;

View File

@ -26,6 +26,8 @@ public:
const String & getFilterColumnName() const { return filter_column_name; }
bool removesFilterColumn() const { return remove_filter_column; }
static bool canUseType(const DataTypePtr & type);
void serialize(Serialization & ctx) const override;
static std::unique_ptr<IQueryPlanStep> deserialize(Deserialization & ctx);

View File

@ -142,7 +142,7 @@ size_t TokenInfo::getTotalSize() const
return size + parts.size() - 1;
}
#ifdef ABORT_ON_LOGICAL_ERROR
#ifdef DEBUG_OR_SANITIZER_BUILD
void CheckTokenTransform::transform(Chunk & chunk)
{
auto token_info = chunk.getChunkInfos().get<TokenInfo>();

View File

@ -93,7 +93,7 @@ namespace DeduplicationToken
};
#ifdef ABORT_ON_LOGICAL_ERROR
#ifdef DEBUG_OR_SANITIZER_BUILD
/// use that class only with debug builds in CI for introspection
class CheckTokenTransform : public ISimpleTransform
{

View File

@ -15,13 +15,18 @@ namespace ErrorCodes
extern const int ILLEGAL_TYPE_OF_COLUMN_FOR_FILTER;
}
bool FilterTransform::canUseType(const DataTypePtr & filter_type)
{
return filter_type->onlyNull() || isUInt8(removeLowCardinalityAndNullable(filter_type));
}
Block FilterTransform::transformHeader(
const Block & header, const ActionsDAG * expression, const String & filter_column_name, bool remove_filter_column)
{
Block result = expression ? expression->updateHeader(header) : header;
auto filter_type = result.getByName(filter_column_name).type;
if (!filter_type->onlyNull() && !isUInt8(removeNullable(removeLowCardinality(filter_type))))
if (!canUseType(filter_type))
throw Exception(ErrorCodes::ILLEGAL_TYPE_OF_COLUMN_FOR_FILTER,
"Illegal type {} of column {} for filter. Must be UInt8 or Nullable(UInt8).",
filter_type->getName(), filter_column_name);

View File

@ -31,6 +31,8 @@ public:
void transform(Chunk & chunk) override;
static bool canUseType(const DataTypePtr & type);
private:
ExpressionActionsPtr expression;
String filter_column_name;

View File

@ -381,7 +381,7 @@ std::optional<Chain> generateViewChain(
table_prefers_large_blocks ? settings[Setting::min_insert_block_size_bytes] : 0ULL));
}
#ifdef ABORT_ON_LOGICAL_ERROR
#ifdef DEBUG_OR_SANITIZER_BUILD
out.addSource(std::make_shared<DeduplicationToken::CheckTokenTransform>("Before squashing", out.getInputHeader()));
#endif
@ -427,7 +427,7 @@ std::optional<Chain> generateViewChain(
if (type == QueryViewsLogElement::ViewType::MATERIALIZED)
{
#ifdef ABORT_ON_LOGICAL_ERROR
#ifdef DEBUG_OR_SANITIZER_BUILD
out.addSource(std::make_shared<DeduplicationToken::CheckTokenTransform>("Right after Inner query", out.getInputHeader()));
#endif
@ -450,7 +450,7 @@ std::optional<Chain> generateViewChain(
}
#ifdef ABORT_ON_LOGICAL_ERROR
#ifdef DEBUG_OR_SANITIZER_BUILD
out.addSource(std::make_shared<DeduplicationToken::CheckTokenTransform>("Right before Inner query", out.getInputHeader()));
#endif
}

View File

@ -364,7 +364,7 @@ void RefreshTask::refreshTask()
if (coordination.root_znode.last_attempt_replica == coordination.replica_name)
{
LOG_ERROR(log, "Znode {} indicates that this replica is running a refresh, but it isn't. Likely a bug.", coordination.path + "/running");
#ifdef ABORT_ON_LOGICAL_ERROR
#ifdef DEBUG_OR_SANITIZER_BUILD
abortOnFailedAssertion("Unexpected refresh lock in keeper");
#else
coordination.running_znode_exists = false;

View File

@ -88,7 +88,8 @@ void FutureMergedMutatedPart::assign(MergeTreeData::DataPartsVector parts_, Merg
void FutureMergedMutatedPart::updatePath(const MergeTreeData & storage, const IReservation * reservation)
{
path = storage.getFullPathOnDisk(reservation->getDisk()) + name + "/";
path = fs::path(storage.getFullPathOnDisk(reservation->getDisk())) / name;
path += "/";
}
}

View File

@ -387,7 +387,7 @@ IMergeTreeDataPart::IndexPtr IMergeTreeDataPart::getIndex() const
IMergeTreeDataPart::IndexPtr IMergeTreeDataPart::loadIndexToCache(PrimaryIndexCache & index_cache) const
{
auto key = PrimaryIndexCache::hash(getDataPartStorage().getFullPath());
auto key = PrimaryIndexCache::hash(getRelativePathOfActivePart());
auto callback = [this] { return loadIndex(); };
return index_cache.getOrSet(key, callback);
}
@ -398,7 +398,7 @@ void IMergeTreeDataPart::moveIndexToCache(PrimaryIndexCache & index_cache)
if (!index)
return;
auto key = PrimaryIndexCache::hash(getDataPartStorage().getFullPath());
auto key = PrimaryIndexCache::hash(getRelativePathOfActivePart());
index_cache.set(key, std::const_pointer_cast<Index>(index));
index.reset();
@ -406,6 +406,15 @@ void IMergeTreeDataPart::moveIndexToCache(PrimaryIndexCache & index_cache)
projection->moveIndexToCache(index_cache);
}
void IMergeTreeDataPart::removeIndexFromCache(PrimaryIndexCache * index_cache) const
{
if (!index_cache)
return;
auto key = PrimaryIndexCache::hash(getRelativePathOfActivePart());
index_cache->remove(key);
}
void IMergeTreeDataPart::setIndex(Columns index_columns)
{
std::scoped_lock lock(index_mutex);
@ -574,17 +583,49 @@ bool IMergeTreeDataPart::isMovingPart() const
return part_directory_path.parent_path().filename() == "moving";
}
void IMergeTreeDataPart::clearCaches()
{
if (cleared_data_in_caches.exchange(true) || is_duplicate)
return;
size_t uncompressed_bytes = getBytesUncompressedOnDisk();
/// Remove index and marks from cache if it was prewarmed to avoid threshing it with outdated data.
/// Do not remove in other cases to avoid extra contention on caches.
removeMarksFromCache(storage.getMarkCacheToPrewarm(uncompressed_bytes).get());
removeIndexFromCache(storage.getPrimaryIndexCacheToPrewarm(uncompressed_bytes).get());
}
bool IMergeTreeDataPart::mayStoreDataInCaches() const
{
size_t uncompressed_bytes = getBytesUncompressedOnDisk();
auto mark_cache = storage.getMarkCacheToPrewarm(uncompressed_bytes);
auto index_cache = storage.getPrimaryIndexCacheToPrewarm(uncompressed_bytes);
return (mark_cache || index_cache) && !cleared_data_in_caches;
}
void IMergeTreeDataPart::removeIfNeeded() noexcept
{
assert(assertHasValidVersionMetadata());
if (!is_temp && state != MergeTreeDataPartState::DeleteOnDestroy)
return;
std::string path;
try
{
path = getDataPartStorage().getRelativePath();
clearCaches();
}
catch (...)
{
tryLogCurrentException(__PRETTY_FUNCTION__, fmt::format("while removing part {} with path {}", name, path));
}
if (!is_temp && state != MergeTreeDataPartState::DeleteOnDestroy)
return;
try
{
if (!getDataPartStorage().exists()) // path
return;
@ -2113,6 +2154,11 @@ std::optional<String> IMergeTreeDataPart::getRelativePathForDetachedPart(const S
return {};
}
String IMergeTreeDataPart::getRelativePathOfActivePart() const
{
return fs::path(getDataPartStorage().getFullRootPath()) / name / "";
}
void IMergeTreeDataPart::renameToDetached(const String & prefix)
{
auto path_to_detach = getRelativePathForDetachedPart(prefix, /* broken */ false);

View File

@ -1,5 +1,6 @@
#pragma once
#include <atomic>
#include <unordered_map>
#include <IO/WriteSettings.h>
#include <Core/Block.h>
@ -185,6 +186,15 @@ public:
/// Loads marks and saves them into mark cache for specified columns.
virtual void loadMarksToCache(const Names & column_names, MarkCache * mark_cache) const = 0;
/// Removes marks from cache for all columns in part.
virtual void removeMarksFromCache(MarkCache * mark_cache) const = 0;
/// Removes data related to data part from mark and primary index caches.
void clearCaches();
/// Returns true if data related to data part may be stored in mark and primary index caches.
bool mayStoreDataInCaches() const;
String getMarksFileExtension() const { return index_granularity_info.mark_type.getFileExtension(); }
/// Generate the new name for this part according to `new_part_info` and min/max dates from the old name.
@ -376,6 +386,7 @@ public:
IndexPtr getIndex() const;
IndexPtr loadIndexToCache(PrimaryIndexCache & index_cache) const;
void moveIndexToCache(PrimaryIndexCache & index_cache);
void removeIndexFromCache(PrimaryIndexCache * index_cache) const;
void setIndex(Columns index_columns);
void unloadIndex();
@ -436,6 +447,10 @@ public:
std::optional<String> getRelativePathForPrefix(const String & prefix, bool detached = false, bool broken = false) const;
/// This method ignores current tmp prefix of part and returns
/// the name of part when it was or will be in Active state.
String getRelativePathOfActivePart() const;
bool isProjectionPart() const { return parent_part != nullptr; }
/// Check if the part is in the `/moving` directory
@ -757,6 +772,9 @@ private:
/// This ugly flag is needed for debug assertions only
mutable bool part_is_probably_removed_from_disk = false;
/// If it's true then data related to this part is cleared from mark and index caches.
mutable std::atomic_bool cleared_data_in_caches = false;
};
using MergeTreeDataPartPtr = std::shared_ptr<const IMergeTreeDataPart>;

View File

@ -120,10 +120,4 @@ NameSet IMergedBlockOutputStream::removeEmptyColumnsFromPart(
return remove_files;
}
PlainMarksByName IMergedBlockOutputStream::releaseCachedMarks()
{
if (!writer)
return {};
return writer->releaseCachedMarks();
}
}

View File

@ -35,7 +35,10 @@ public:
return writer->getIndexGranularity();
}
PlainMarksByName releaseCachedMarks();
PlainMarksByName releaseCachedMarks()
{
return writer ? writer->releaseCachedMarks() : PlainMarksByName{};
}
size_t getNumberOfOpenStreams() const
{
@ -43,7 +46,6 @@ public:
}
protected:
/// Remove all columns marked expired in data_part. Also, clears checksums
/// and columns array. Return set of removed files names.
NameSet removeEmptyColumnsFromPart(

View File

@ -445,12 +445,14 @@ bool MergeFromLogEntryTask::finalize(ReplicatedMergeMutateTaskBase::PartLogWrite
finish_callback = [storage_ptr = &storage]() { storage_ptr->merge_selecting_task->schedule(); };
ProfileEvents::increment(ProfileEvents::ReplicatedPartMerges);
if (auto mark_cache = storage.getMarkCacheToPrewarm())
size_t bytes_uncompressed = part->getBytesUncompressedOnDisk();
if (auto mark_cache = storage.getMarkCacheToPrewarm(bytes_uncompressed))
addMarksToCache(*part, cached_marks, mark_cache.get());
/// Move index to cache and reset it here because we need
/// a correct part name after rename for a key of cache entry.
if (auto index_cache = storage.getPrimaryIndexCacheToPrewarm())
if (auto index_cache = storage.getPrimaryIndexCacheToPrewarm(bytes_uncompressed))
part->moveIndexToCache(*index_cache);
write_part_log({});

View File

@ -152,13 +152,15 @@ void MergePlainMergeTreeTask::finish()
ThreadFuzzer::maybeInjectSleep();
ThreadFuzzer::maybeInjectMemoryLimitException();
if (auto mark_cache = storage.getMarkCacheToPrewarm())
size_t bytes_uncompressed = new_part->getBytesUncompressedOnDisk();
if (auto mark_cache = storage.getMarkCacheToPrewarm(bytes_uncompressed))
{
auto marks = merge_task->releaseCachedMarks();
addMarksToCache(*new_part, marks, mark_cache.get());
}
if (auto index_cache = storage.getPrimaryIndexCacheToPrewarm())
if (auto index_cache = storage.getPrimaryIndexCacheToPrewarm(bytes_uncompressed))
{
/// Move index to cache and reset it here because we need
/// a correct part name after rename for a key of cache entry.

View File

@ -578,6 +578,7 @@ bool MergeTask::ExecuteAndFinalizeHorizontalPart::prepare() const
ctx->compression_codec,
std::move(index_granularity_ptr),
global_ctx->txn ? global_ctx->txn->tid : Tx::PrehistoricTID,
global_ctx->merge_list_element_ptr->total_size_bytes_compressed,
/*reset_columns=*/ true,
ctx->blocks_are_granules_size,
global_ctx->context->getWriteSettings());
@ -1125,6 +1126,7 @@ void MergeTask::VerticalMergeStage::prepareVerticalMergeForOneColumn() const
getStatisticsForColumns(columns_list, global_ctx->metadata_snapshot),
ctx->compression_codec,
global_ctx->to->getIndexGranularity(),
global_ctx->merge_list_element_ptr->total_size_bytes_uncompressed,
&global_ctx->written_offset_columns);
ctx->column_elems_written = 0;

View File

@ -238,6 +238,7 @@ namespace MergeTreeSetting
extern const MergeTreeSettingsBool prewarm_mark_cache;
extern const MergeTreeSettingsBool primary_key_lazy_load;
extern const MergeTreeSettingsBool enforce_index_structure_match_on_partition_manipulation;
extern const MergeTreeSettingsUInt64 min_bytes_to_prewarm_caches;
}
namespace ServerSetting
@ -2366,19 +2367,31 @@ PrimaryIndexCachePtr MergeTreeData::getPrimaryIndexCache() const
return getContext()->getPrimaryIndexCache();
}
PrimaryIndexCachePtr MergeTreeData::getPrimaryIndexCacheToPrewarm() const
PrimaryIndexCachePtr MergeTreeData::getPrimaryIndexCacheToPrewarm(size_t part_uncompressed_bytes) const
{
if (!(*getSettings())[MergeTreeSetting::prewarm_primary_key_cache])
return nullptr;
/// Do not load data to caches for small parts because
/// they will be likely replaced by merge immediately.
size_t min_bytes_to_prewarm = (*getSettings())[MergeTreeSetting::min_bytes_to_prewarm_caches];
if (part_uncompressed_bytes && part_uncompressed_bytes < min_bytes_to_prewarm)
return nullptr;
return getPrimaryIndexCache();
}
MarkCachePtr MergeTreeData::getMarkCacheToPrewarm() const
MarkCachePtr MergeTreeData::getMarkCacheToPrewarm(size_t part_uncompressed_bytes) const
{
if (!(*getSettings())[MergeTreeSetting::prewarm_mark_cache])
return nullptr;
/// Do not load data to caches for small parts because
/// they will be likely replaced by merge immediately.
size_t min_bytes_to_prewarm = (*getSettings())[MergeTreeSetting::min_bytes_to_prewarm_caches];
if (part_uncompressed_bytes && part_uncompressed_bytes < min_bytes_to_prewarm)
return nullptr;
return getContext()->getMarkCache();
}
@ -9035,7 +9048,8 @@ std::pair<MergeTreeData::MutableDataPartPtr, scope_guard> MergeTreeData::createE
ColumnsStatistics{},
compression_codec,
std::make_shared<MergeTreeIndexGranularityAdaptive>(),
txn ? txn->tid : Tx::PrehistoricTID);
txn ? txn->tid : Tx::PrehistoricTID,
/*part_uncompressed_bytes=*/ 0);
bool sync_on_insert = (*settings)[MergeTreeSetting::fsync_after_insert];
@ -9108,14 +9122,14 @@ void MergeTreeData::unloadPrimaryKeys()
}
}
size_t MergeTreeData::unloadPrimaryKeysOfOutdatedParts()
size_t MergeTreeData::unloadPrimaryKeysAndClearCachesOfOutdatedParts()
{
/// If the method is already called from another thread, then we don't need to do anything.
std::unique_lock lock(unload_primary_key_mutex, std::defer_lock);
if (!lock.try_lock())
return 0;
DataPartsVector parts_to_unload_index;
DataPartsVector parts_to_clear;
{
auto parts_lock = lockParts();
@ -9126,18 +9140,22 @@ size_t MergeTreeData::unloadPrimaryKeysOfOutdatedParts()
/// Outdated part may be hold by SELECT query and still needs the index.
/// This check requires lock of index_mutex but if outdated part is unique then there is no
/// contention on it, so it's relatively cheap and it's ok to check under a global parts lock.
if (isSharedPtrUnique(part) && part->isIndexLoaded())
parts_to_unload_index.push_back(part);
if (isSharedPtrUnique(part) && (part->isIndexLoaded() || part->mayStoreDataInCaches()))
parts_to_clear.push_back(part);
}
}
for (const auto & part : parts_to_unload_index)
for (const auto & part : parts_to_clear)
{
const_cast<IMergeTreeDataPart &>(*part).unloadIndex();
auto & part_mut = const_cast<IMergeTreeDataPart &>(*part);
part_mut.unloadIndex();
part_mut.clearCaches();
LOG_TEST(log, "Unloaded primary key for outdated part {}", part->name);
}
return parts_to_unload_index.size();
return parts_to_clear.size();
}
void MergeTreeData::verifySortingKey(const KeyDescription & sorting_key)

View File

@ -511,9 +511,9 @@ public:
/// Returns a pointer to primary index cache if it is enabled.
PrimaryIndexCachePtr getPrimaryIndexCache() const;
/// Returns a pointer to primary index cache if it is enabled and required to be prewarmed.
PrimaryIndexCachePtr getPrimaryIndexCacheToPrewarm() const;
PrimaryIndexCachePtr getPrimaryIndexCacheToPrewarm(size_t part_uncompressed_bytes) const;
/// Returns a pointer to primary mark cache if it is required to be prewarmed.
MarkCachePtr getMarkCacheToPrewarm() const;
MarkCachePtr getMarkCacheToPrewarm(size_t part_uncompressed_bytes) const;
/// Prewarm mark cache and primary index cache for the most recent data parts.
void prewarmCaches(ThreadPool & pool, MarkCachePtr mark_cache, PrimaryIndexCachePtr index_cache);
@ -1166,7 +1166,7 @@ public:
/// Unloads primary keys of outdated parts that are not used by any query.
/// Returns the number of parts for which index was unloaded.
size_t unloadPrimaryKeysOfOutdatedParts();
size_t unloadPrimaryKeysAndClearCachesOfOutdatedParts();
protected:
friend class IMergeTreeDataPart;
@ -1335,7 +1335,7 @@ protected:
std::mutex grab_old_parts_mutex;
/// The same for clearOldTemporaryDirectories.
std::mutex clear_old_temporary_directories_mutex;
/// The same for unloadPrimaryKeysOfOutdatedParts.
/// The same for unloadPrimaryKeysAndClearCachesOfOutdatedParts.
std::mutex unload_primary_key_mutex;
void checkProperties(

View File

@ -175,6 +175,16 @@ void MergeTreeDataPartCompact::loadMarksToCache(const Names & column_names, Mark
loader.loadMarks();
}
void MergeTreeDataPartCompact::removeMarksFromCache(MarkCache * mark_cache) const
{
if (!mark_cache)
return;
auto mark_path = index_granularity_info.getMarksFilePath(DATA_FILE_NAME);
auto key = MarkCache::hash(fs::path(getRelativePathOfActivePart()) / mark_path);
mark_cache->remove(key);
}
bool MergeTreeDataPartCompact::hasColumnFiles(const NameAndTypePair & column) const
{
if (!getColumnPosition(column.getNameInStorage()))

View File

@ -55,6 +55,7 @@ public:
std::optional<String> getFileNameForColumn(const NameAndTypePair & /* column */) const override { return DATA_FILE_NAME; }
void loadMarksToCache(const Names & column_names, MarkCache * mark_cache) const override;
void removeMarksFromCache(MarkCache * mark_cache) const override;
~MergeTreeDataPartCompact() override;

View File

@ -244,6 +244,27 @@ void MergeTreeDataPartWide::loadMarksToCache(const Names & column_names, MarkCac
loader->loadMarks();
}
void MergeTreeDataPartWide::removeMarksFromCache(MarkCache * mark_cache) const
{
if (!mark_cache)
return;
const auto & serializations = getSerializations();
for (const auto & [column_name, serialization] : serializations)
{
serialization->enumerateStreams([&](const auto & subpath)
{
auto stream_name = getStreamNameForColumn(column_name, subpath, checksums);
if (!stream_name)
return;
auto mark_path = index_granularity_info.getMarksFilePath(*stream_name);
auto key = MarkCache::hash(fs::path(getRelativePathOfActivePart()) / mark_path);
mark_cache->remove(key);
});
}
}
bool MergeTreeDataPartWide::isStoredOnRemoteDisk() const
{
return getDataPartStorage().isStoredOnRemoteDisk();

View File

@ -52,6 +52,7 @@ public:
std::optional<time_t> getColumnModificationTime(const String & column_name) const override;
void loadMarksToCache(const Names & column_names, MarkCache * mark_cache) const override;
void removeMarksFromCache(MarkCache * mark_cache) const override;
protected:
static void loadIndexGranularityImpl(

View File

@ -225,12 +225,13 @@ void MergeTreeDataWriter::TemporaryPart::finalize()
projection->getDataPartStorage().precommitTransaction();
}
/// This method must be called after rename and commit of part
/// because a correct path is required for the keys of caches.
void MergeTreeDataWriter::TemporaryPart::prewarmCaches()
{
/// This method must be called after rename and commit of part
/// because a correct path is required for the keys of caches.
size_t bytes_uncompressed = part->getBytesUncompressedOnDisk();
if (auto mark_cache = part->storage.getMarkCacheToPrewarm())
if (auto mark_cache = part->storage.getMarkCacheToPrewarm(bytes_uncompressed))
{
for (const auto & stream : streams)
{
@ -239,7 +240,7 @@ void MergeTreeDataWriter::TemporaryPart::prewarmCaches()
}
}
if (auto index_cache = part->storage.getPrimaryIndexCacheToPrewarm())
if (auto index_cache = part->storage.getPrimaryIndexCacheToPrewarm(bytes_uncompressed))
{
/// Index was already set during writing. Now move it to cache.
part->moveIndexToCache(*index_cache);
@ -726,6 +727,7 @@ MergeTreeDataWriter::TemporaryPart MergeTreeDataWriter::writeTempPartImpl(
compression_codec,
std::move(index_granularity_ptr),
context->getCurrentTransaction() ? context->getCurrentTransaction()->tid : Tx::PrehistoricTID,
block.bytes(),
/*reset_columns=*/ false,
/*blocks_are_granules_size=*/ false,
context->getWriteSettings());
@ -880,6 +882,7 @@ MergeTreeDataWriter::TemporaryPart MergeTreeDataWriter::writeProjectionPartImpl(
compression_codec,
std::move(index_granularity_ptr),
Tx::PrehistoricTID,
block.bytes(),
/*reset_columns=*/ false,
/*blocks_are_granules_size=*/ false,
data.getContext()->getWriteSettings());

View File

@ -16,6 +16,7 @@ namespace ProfileEvents
{
extern const Event WaitMarksLoadMicroseconds;
extern const Event BackgroundLoadingMarksTasks;
extern const Event LoadedMarksFiles;
extern const Event LoadedMarksCount;
extern const Event LoadedMarksMemoryBytes;
}
@ -203,6 +204,7 @@ MarkCache::MappedPtr MergeTreeMarksLoader::loadMarksImpl()
auto res = std::make_shared<MarksInCompressedFile>(plain_marks);
ProfileEvents::increment(ProfileEvents::LoadedMarksFiles);
ProfileEvents::increment(ProfileEvents::LoadedMarksCount, marks_count * num_columns_in_mark);
ProfileEvents::increment(ProfileEvents::LoadedMarksMemoryBytes, res->approximateMemoryUsage());
@ -264,7 +266,7 @@ void addMarksToCache(const IMergeTreeDataPart & part, const PlainMarksByName & c
for (const auto & [stream_name, marks] : cached_marks)
{
auto mark_path = part.index_granularity_info.getMarksFilePath(stream_name);
auto key = MarkCache::hash(fs::path(part.getDataPartStorage().getFullPath()) / mark_path);
auto key = MarkCache::hash(fs::path(part.getRelativePathOfActivePart()) / mark_path);
mark_cache->set(key, std::make_shared<MarksInCompressedFile>(*marks));
}
}

View File

@ -83,6 +83,9 @@ struct MergeTreeSettings;
/// Adds computed marks for part to the marks cache.
void addMarksToCache(const IMergeTreeDataPart & part, const PlainMarksByName & cached_marks, MarkCache * mark_cache);
/// Removes cached marks for all columns from part.
void removeMarksFromCache(const IMergeTreeDataPart & part, MarkCache * mark_cache);
/// Returns the list of columns suitable for prewarming of mark cache according to settings.
Names getColumnsToPrewarmMarks(const MergeTreeSettings & settings, const NamesAndTypesList & columns_list);

View File

@ -244,6 +244,7 @@ namespace ErrorCodes
DECLARE(Bool, prewarm_primary_key_cache, false, "If true primary index cache will be prewarmed by saving marks to mark cache on inserts, merges, fetches and on startup of server", 0) \
DECLARE(Bool, prewarm_mark_cache, false, "If true mark cache will be prewarmed by saving marks to mark cache on inserts, merges, fetches and on startup of server", 0) \
DECLARE(String, columns_to_prewarm_mark_cache, "", "List of columns to prewarm mark cache for (if enabled). Empty means all columns", 0) \
DECLARE(UInt64, min_bytes_to_prewarm_caches, 0, "Minimal size (uncomressed bytes) to prewarm mark cache and primary index cache for new parts", 0) \
/** Projection settings. */ \
DECLARE(UInt64, max_projections, 25, "The maximum number of merge tree projections.", 0) \
DECLARE(LightweightMutationProjectionMode, lightweight_mutation_projection_mode, LightweightMutationProjectionMode::THROW, "When lightweight delete happens on a table with projection(s), the possible operations include throw the exception as projection exists, or drop projections of this table's relevant parts, or rebuild the projections.", 0) \

View File

@ -29,6 +29,7 @@ MergedBlockOutputStream::MergedBlockOutputStream(
CompressionCodecPtr default_codec_,
MergeTreeIndexGranularityPtr index_granularity_ptr,
TransactionID tid,
size_t part_uncompressed_bytes,
bool reset_columns_,
bool blocks_are_granules_size,
const WriteSettings & write_settings_)
@ -38,9 +39,9 @@ MergedBlockOutputStream::MergedBlockOutputStream(
, write_settings(write_settings_)
{
/// Save marks in memory if prewarm is enabled to avoid re-reading marks file.
bool save_marks_in_cache = data_part->storage.getMarkCacheToPrewarm() != nullptr;
bool save_marks_in_cache = data_part->storage.getMarkCacheToPrewarm(part_uncompressed_bytes) != nullptr;
/// Save primary index in memory if cache is disabled or is enabled with prewarm to avoid re-reading primary index file.
bool save_primary_index_in_memory = !data_part->storage.getPrimaryIndexCache() || data_part->storage.getPrimaryIndexCacheToPrewarm();
bool save_primary_index_in_memory = !data_part->storage.getPrimaryIndexCache() || data_part->storage.getPrimaryIndexCacheToPrewarm(part_uncompressed_bytes);
MergeTreeWriterSettings writer_settings(
data_part->storage.getContext()->getSettingsRef(),

View File

@ -24,6 +24,7 @@ public:
CompressionCodecPtr default_codec_,
MergeTreeIndexGranularityPtr index_granularity_ptr,
TransactionID tid,
size_t part_uncompressed_bytes,
bool reset_columns_ = false,
bool blocks_are_granules_size = false,
const WriteSettings & write_settings = {});

View File

@ -21,13 +21,14 @@ MergedColumnOnlyOutputStream::MergedColumnOnlyOutputStream(
const ColumnsStatistics & stats_to_recalc,
CompressionCodecPtr default_codec,
MergeTreeIndexGranularityPtr index_granularity_ptr,
size_t part_uncompressed_bytes,
WrittenOffsetColumns * offset_columns)
: IMergedBlockOutputStream(data_part->storage.getSettings(), data_part->getDataPartStoragePtr(), metadata_snapshot_, columns_list_, /*reset_columns=*/ true)
{
/// Save marks in memory if prewarm is enabled to avoid re-reading marks file.
bool save_marks_in_cache = data_part->storage.getMarkCacheToPrewarm() != nullptr;
bool save_marks_in_cache = data_part->storage.getMarkCacheToPrewarm(part_uncompressed_bytes) != nullptr;
/// Save primary index in memory if cache is disabled or is enabled with prewarm to avoid re-reading priamry index file.
bool save_primary_index_in_memory = !data_part->storage.getPrimaryIndexCache() || data_part->storage.getPrimaryIndexCacheToPrewarm();
bool save_primary_index_in_memory = !data_part->storage.getPrimaryIndexCache() || data_part->storage.getPrimaryIndexCacheToPrewarm(part_uncompressed_bytes);
/// Granularity is never recomputed while writing only columns.
MergeTreeWriterSettings writer_settings(

View File

@ -22,6 +22,7 @@ public:
const ColumnsStatistics & stats_to_recalc,
CompressionCodecPtr default_codec,
MergeTreeIndexGranularityPtr index_granularity_ptr,
size_t part_uncompressed_bytes,
WrittenOffsetColumns * offset_columns = nullptr);
void write(const Block & block) override;

View File

@ -1625,8 +1625,8 @@ private:
else
{
index_granularity_ptr = createMergeTreeIndexGranularity(
ctx->new_data_part->rows_count,
ctx->new_data_part->getBytesUncompressedOnDisk(),
ctx->source_part->rows_count,
ctx->source_part->getBytesUncompressedOnDisk(),
*ctx->data->getSettings(),
ctx->new_data_part->index_granularity_info,
/*blocks_are_granules=*/ false);
@ -1641,6 +1641,7 @@ private:
ctx->compression_codec,
std::move(index_granularity_ptr),
ctx->txn ? ctx->txn->tid : Tx::PrehistoricTID,
ctx->source_part->getBytesUncompressedOnDisk(),
/*reset_columns=*/ true,
/*blocks_are_granules_size=*/ false,
ctx->context->getWriteSettings());
@ -1876,7 +1877,8 @@ private:
std::vector<MergeTreeIndexPtr>(ctx->indices_to_recalc.begin(), ctx->indices_to_recalc.end()),
ColumnsStatistics(ctx->stats_to_recalc.begin(), ctx->stats_to_recalc.end()),
ctx->compression_codec,
ctx->source_part->index_granularity);
ctx->source_part->index_granularity,
ctx->source_part->getBytesUncompressedOnDisk());
ctx->mutating_pipeline = QueryPipelineBuilder::getPipeline(std::move(*builder));
ctx->mutating_pipeline.setProgressCallback(ctx->progress_callback);

View File

@ -193,7 +193,7 @@ Float32 ReplicatedMergeTreeCleanupThread::iterate()
cleaned_part_like += storage.clearEmptyParts();
}
cleaned_part_like += storage.unloadPrimaryKeysOfOutdatedParts();
cleaned_part_like += storage.unloadPrimaryKeysAndClearCachesOfOutdatedParts();
/// We need to measure the number of removed objects somehow (for better scheduling),
/// but just summing the number of removed async blocks, logs, and empty parts does not make any sense.

View File

@ -130,7 +130,6 @@ private:
std::unique_ptr<DelayedChunk> delayed_chunk;
void finishDelayedChunk(const ZooKeeperWithFaultInjectionPtr & zookeeper);
void prewarmCaches(const MergeTreeDataWriter::TemporaryPart & temp_part) const;
};
using ReplicatedMergeTreeSinkWithAsyncDeduplicate = ReplicatedMergeTreeSinkImpl<true>;

View File

@ -209,7 +209,7 @@ struct DeltaLakeMetadataImpl
if (!object)
throw Exception(ErrorCodes::LOGICAL_ERROR, "Failed to parse metadata file");
#ifdef ABORT_ON_LOGICAL_ERROR
#ifdef DEBUG_OR_SANITIZER_BUILD
std::ostringstream oss; // STYLE_CHECK_ALLOW_STD_STRING_STREAM
object->stringify(oss);
LOG_TEST(log, "Metadata: {}", oss.str());

View File

@ -158,8 +158,8 @@ StorageMergeTree::StorageMergeTree(
prewarmCaches(
getActivePartsLoadingThreadPool().get(),
getMarkCacheToPrewarm(),
getPrimaryIndexCacheToPrewarm());
getMarkCacheToPrewarm(0),
getPrimaryIndexCacheToPrewarm(0));
}
@ -1522,7 +1522,7 @@ bool StorageMergeTree::scheduleDataProcessingJob(BackgroundJobsAssignee & assign
cleared_count += clearOldPartsFromFilesystem();
cleared_count += clearOldMutations();
cleared_count += clearEmptyParts();
cleared_count += unloadPrimaryKeysOfOutdatedParts();
cleared_count += unloadPrimaryKeysAndClearCachesOfOutdatedParts();
return cleared_count;
/// TODO maybe take into account number of cleared objects when calculating backoff
}, common_assignee_trigger, getStorageID()), /* need_trigger */ false);

View File

@ -515,8 +515,8 @@ StorageReplicatedMergeTree::StorageReplicatedMergeTree(
prewarmCaches(
getActivePartsLoadingThreadPool().get(),
getMarkCacheToPrewarm(),
getPrimaryIndexCacheToPrewarm());
getMarkCacheToPrewarm(0),
getPrimaryIndexCacheToPrewarm(0));
if (LoadingStrictnessLevel::ATTACH <= mode)
{
@ -5089,13 +5089,15 @@ bool StorageReplicatedMergeTree::fetchPart(
ProfileEvents::increment(ProfileEvents::ObsoleteReplicatedParts);
}
if (auto mark_cache = getMarkCacheToPrewarm())
size_t bytes_uncompressed = part->getBytesUncompressedOnDisk();
if (auto mark_cache = getMarkCacheToPrewarm(bytes_uncompressed))
{
auto column_names = getColumnsToPrewarmMarks(*getSettings(), part->getColumns());
part->loadMarksToCache(column_names, mark_cache.get());
}
if (auto index_cache = getPrimaryIndexCacheToPrewarm())
if (auto index_cache = getPrimaryIndexCacheToPrewarm(bytes_uncompressed))
{
part->loadIndexToCache(*index_cache);
}

View File

@ -1596,7 +1596,7 @@ void StorageWindowView::writeIntoWindowView(
return std::make_shared<DeduplicationToken::SetViewBlockNumberTransform>(stream_header);
});
#ifdef ABORT_ON_LOGICAL_ERROR
#ifdef DEBUG_OR_SANITIZER_BUILD
builder.addSimpleTransform([&](const Block & stream_header)
{
return std::make_shared<DeduplicationToken::CheckTokenTransform>("StorageWindowView: Afrer tmp table before squashing", stream_header);
@ -1643,7 +1643,7 @@ void StorageWindowView::writeIntoWindowView(
lateness_upper_bound);
});
#ifdef ABORT_ON_LOGICAL_ERROR
#ifdef DEBUG_OR_SANITIZER_BUILD
builder.addSimpleTransform([&](const Block & stream_header)
{
return std::make_shared<DeduplicationToken::CheckTokenTransform>("StorageWindowView: Afrer WatermarkTransform", stream_header);
@ -1668,7 +1668,7 @@ void StorageWindowView::writeIntoWindowView(
builder.addSimpleTransform([&](const Block & header_) { return std::make_shared<ExpressionTransform>(header_, convert_actions); });
}
#ifdef ABORT_ON_LOGICAL_ERROR
#ifdef DEBUG_OR_SANITIZER_BUILD
builder.addSimpleTransform([&](const Block & stream_header)
{
return std::make_shared<DeduplicationToken::CheckTokenTransform>("StorageWindowView: Before out", stream_header);

View File

@ -2,7 +2,7 @@
#include <Storages/buildQueryTreeForShard.h>
#include <Analyzer/ColumnNode.h>
#include <Analyzer/createUniqueTableAliases.h>
#include <Analyzer/createUniqueAliasesIfNecessary.h>
#include <Analyzer/FunctionNode.h>
#include <Analyzer/InDepthQueryTreeVisitor.h>
#include <Analyzer/IQueryTreeNode.h>
@ -427,7 +427,7 @@ QueryTreeNodePtr buildQueryTreeForShard(const PlannerContextPtr & planner_contex
removeGroupingFunctionSpecializations(query_tree_to_modify);
createUniqueTableAliases(query_tree_to_modify, nullptr, planner_context->getQueryContext());
createUniqueAliasesIfNecessary(query_tree_to_modify, planner_context->getQueryContext());
// Get rid of the settings clause so we don't send them to remote. Thus newly non-important
// settings won't break any remote parser. It's also more reasonable since the query settings

View File

@ -55,6 +55,8 @@ source /repo/tests/docker_scripts/utils.lib
# install test configs
/repo/tests/config/install.sh
azurite-blob --blobHost 0.0.0.0 --blobPort 10000 --silent --inMemoryPersistence &
/repo/tests/docker_scripts/setup_minio.sh stateless
config_logs_export_cluster /etc/clickhouse-server/config.d/system_logs_export.yaml

View File

@ -80,6 +80,7 @@ def _check_exception(exception, expected_tries=3):
for i, line in enumerate(lines[3 : 3 + expected_tries]):
expected_lines = (
"Code: 209. " + EXCEPTION_NETWORK + EXCEPTION_TIMEOUT,
"Code: 209. " + EXCEPTION_NETWORK + "Timeout: connect timed out",
EXCEPTION_CONNECT_TIMEOUT,
EXCEPTION_TIMEOUT,
)

View File

@ -74,7 +74,11 @@ SELECT _shard_num, key, b.host_name, b.host_address IN ('::1', '127.0.0.1'), b.p
FROM dist_1 a
JOIN system.clusters b
ON _shard_num = b.shard_num
WHERE b.cluster = 'test_cluster_two_shards_localhost'; -- { serverError INVALID_JOIN_ON_EXPRESSION }
WHERE b.cluster = 'test_cluster_two_shards_localhost'
ORDER BY key
SETTINGS enable_analyzer = 1;
1 10 localhost 1 9000
1 20 localhost 1 9000
SELECT 'Rewrite with alias';
Rewrite with alias
SELECT a._shard_num, key FROM dist_1 a;
@ -85,7 +89,11 @@ SELECT a._shard_num, a.key, b.host_name, b.host_address IN ('::1', '127.0.0.1'),
FROM dist_1 a
JOIN system.clusters b
ON a._shard_num = b.shard_num
WHERE b.cluster = 'test_cluster_two_shards_localhost'; -- { serverError UNKNOWN_IDENTIFIER, 403 }
WHERE b.cluster = 'test_cluster_two_shards_localhost'
ORDER BY key
SETTINGS enable_analyzer = 1;
1 10 localhost 1 9000
1 20 localhost 1 9000
SELECT 'dist_3';
dist_3
SELECT * FROM dist_3;

View File

@ -73,16 +73,21 @@ SELECT _shard_num, key, b.host_name, b.host_address IN ('::1', '127.0.0.1'), b.p
FROM dist_1 a
JOIN system.clusters b
ON _shard_num = b.shard_num
WHERE b.cluster = 'test_cluster_two_shards_localhost'; -- { serverError INVALID_JOIN_ON_EXPRESSION }
WHERE b.cluster = 'test_cluster_two_shards_localhost'
ORDER BY key
SETTINGS enable_analyzer = 1;
SELECT 'Rewrite with alias';
SELECT a._shard_num, key FROM dist_1 a;
-- the same with JOIN, just in case
SELECT a._shard_num, a.key, b.host_name, b.host_address IN ('::1', '127.0.0.1'), b.port
FROM dist_1 a
JOIN system.clusters b
ON a._shard_num = b.shard_num
WHERE b.cluster = 'test_cluster_two_shards_localhost'; -- { serverError UNKNOWN_IDENTIFIER, 403 }
WHERE b.cluster = 'test_cluster_two_shards_localhost'
ORDER BY key
SETTINGS enable_analyzer = 1;
SELECT 'dist_3';
SELECT * FROM dist_3;

View File

@ -6,6 +6,8 @@
0
0
0
0
0
0
┌─system.one.dummy─┬─A.dummy─┬─B.dummy─┐
1. │ 0 │ 0 │ 0 │

View File

@ -12,9 +12,9 @@ USE system;
SELECT dummy FROM one AS A JOIN one ON A.dummy = one.dummy;
SELECT dummy FROM one JOIN one AS A ON A.dummy = one.dummy;
SELECT dummy FROM one l JOIN one r ON dummy = r.dummy;
SELECT dummy FROM one l JOIN one r ON l.dummy = dummy; -- { serverError INVALID_JOIN_ON_EXPRESSION }
SELECT dummy FROM one l JOIN one r ON one.dummy = r.dummy;
SELECT dummy FROM one l JOIN one r ON l.dummy = one.dummy; -- { serverError INVALID_JOIN_ON_EXPRESSION }
SELECT dummy FROM one l JOIN one r ON l.dummy = dummy;
SELECT dummy FROM one l JOIN one r ON l.dummy = one.dummy;
SELECT * from one
JOIN one A ON one.dummy = A.dummy
@ -26,5 +26,4 @@ JOIN system.one one ON A.dummy = one.dummy
JOIN system.one two ON A.dummy = two.dummy
FORMAT PrettyCompact;
-- SELECT one.dummy FROM one AS A FULL JOIN (SELECT 0 AS dymmy) AS one USING dummy;
SELECT one.dummy FROM one AS A JOIN (SELECT 0 AS dummy) B USING dummy;

View File

@ -2,7 +2,13 @@ drop table if exists test_01081;
create table test_01081 (key Int) engine=MergeTree() order by key;
insert into test_01081 select * from system.numbers limit 10;
select 1 from remote('127.{1,2}', currentDatabase(), test_01081) lhs join system.one as rhs on rhs.dummy = 1 order by 1; -- { serverError INVALID_JOIN_ON_EXPRESSION }
select 1 from remote('127.{1,2}', currentDatabase(), test_01081) lhs join system.one as rhs on rhs.dummy = 1 order by 1
SETTINGS enable_analyzer = 0; -- { serverError INVALID_JOIN_ON_EXPRESSION }
select 1 from remote('127.{1,2}', currentDatabase(), test_01081) lhs join system.one as rhs on rhs.dummy = 1 order by 1
SETTINGS enable_analyzer = 1;
-- With multiple blocks triggers:
--
@ -11,6 +17,10 @@ select 1 from remote('127.{1,2}', currentDatabase(), test_01081) lhs join system
-- _dummy Int Int32(size = 0), 1 UInt8 Const(size = 0, UInt8(size = 1)).
insert into test_01081 select * from system.numbers limit 10;
select 1 from remote('127.{1,2}', currentDatabase(), test_01081) lhs join system.one as rhs on rhs.dummy = 1 order by 1; -- { serverError INVALID_JOIN_ON_EXPRESSION }
select 1 from remote('127.{1,2}', currentDatabase(), test_01081) lhs join system.one as rhs on rhs.dummy = 1 order by 1
SETTINGS enable_analyzer = 0; -- { serverError INVALID_JOIN_ON_EXPRESSION }
select 1 from remote('127.{1,2}', currentDatabase(), test_01081) lhs join system.one as rhs on rhs.dummy = 1 order by 1
SETTINGS enable_analyzer = 1;
drop table if exists test_01081;

View File

@ -1,14 +1,6 @@
SELECT 1 FROM (select 1 a) A JOIN (select 1 b) B ON (arrayJoin([1]) = B.b); -- { serverError INVALID_JOIN_ON_EXPRESSION }
SELECT 1 FROM (select 1 a) A JOIN (select 1 b) B ON (A.a = arrayJoin([1])); -- { serverError INVALID_JOIN_ON_EXPRESSION }
SELECT 1 FROM (select 1 a) A JOIN (select 1 b) B ON equals(a); -- { serverError NUMBER_OF_ARGUMENTS_DOESNT_MATCH, 62 }
SELECT 1 FROM (select 1 a) A JOIN (select 1 b) B ON less(a); -- { serverError NUMBER_OF_ARGUMENTS_DOESNT_MATCH, 62 }
SELECT 1 FROM (select 1 a) A JOIN (select 1 b) B ON a = b AND a > b; -- { serverError INVALID_JOIN_ON_EXPRESSION }
SELECT 1 FROM (select 1 a) A JOIN (select 1 b) B ON a = b AND a < b; -- { serverError INVALID_JOIN_ON_EXPRESSION }
SELECT 1 FROM (select 1 a) A JOIN (select 1 b) B ON a = b AND a >= b; -- { serverError INVALID_JOIN_ON_EXPRESSION }
SELECT 1 FROM (select 1 a) A JOIN (select 1 b) B ON a = b AND a <= b; -- { serverError INVALID_JOIN_ON_EXPRESSION }
SET join_algorithm = 'partial_merge';
SELECT 1 FROM (select 1 a) A JOIN (select 1 b, 1 c) B ON a = b OR a = c; -- { serverError NOT_IMPLEMENTED }
-- works for a = b OR a = b because of equivalent disjunct optimization
@ -16,8 +8,3 @@ SELECT 1 FROM (select 1 a) A JOIN (select 1 b, 1 c) B ON a = b OR a = c; -- { se
SET join_algorithm = 'grace_hash';
SELECT 1 FROM (select 1 a) A JOIN (select 1 b, 1 c) B ON a = b OR a = c; -- { serverError NOT_IMPLEMENTED }
-- works for a = b OR a = b because of equivalent disjunct optimization
SET join_algorithm = 'hash';
-- conditions for different table joined via OR
SELECT * FROM (SELECT 1 AS a, 1 AS b, 1 AS c) AS t1 INNER JOIN (SELECT 1 AS a, 1 AS b, 1 AS c) AS t2 ON t1.a = t2.a AND (t1.b > 0 OR t2.b > 0); -- { serverError INVALID_JOIN_ON_EXPRESSION }

View File

@ -121,8 +121,8 @@ Filter column: and(notEquals(y, 2), notEquals(x, 0))
ARRAY JOIN x
Filter column: notEquals(y, 2)
> (analyzer) filter is split, one part is filtered before ARRAY JOIN
Filter column: and(notEquals(__table2.y, 2_UInt8), notEquals(__table1.x, 0_UInt8))
ARRAY JOIN __table1.x
Filter column: and(notEquals(__table2.y, 2_UInt8), notEquals(__array_join_exp_1, 0_UInt8))
ARRAY JOIN __array_join_exp_1
Filter column: notEquals(__table2.y, 2_UInt8)
1 3
> filter is pushed down before Distinct

View File

@ -134,7 +134,7 @@ $CLICKHOUSE_CLIENT --enable_analyzer=1 -q "
explain actions = 1 select x, y from (
select range(number) as x, number + 1 as y from numbers(3)
) array join x where y != 2 and x != 0" |
grep -o "Filter column: and(notEquals(__table2.y, 2_UInt8), notEquals(__table1.x, 0_UInt8))\|ARRAY JOIN __table1.x\|Filter column: notEquals(__table2.y, 2_UInt8)"
grep -o "Filter column: and(notEquals(__table2.y, 2_UInt8), notEquals(__array_join_exp_1, 0_UInt8))\|ARRAY JOIN __array_join_exp_1\|Filter column: notEquals(__table2.y, 2_UInt8)"
$CLICKHOUSE_CLIENT -q "
select x, y from (
select range(number) as x, number + 1 as y from numbers(3)

View File

@ -11,7 +11,7 @@ select * from (select 'a' as a, 'b' as b, 'c' as c, 'd' as d, 'e' as e, 'f' as f
select * from (select 'a' as a, 'b' as b, 'c' as c, 'd' as d, 'e' as e, 'f' as f) as t1 left join (select 'a' as a, 'b' as b, 'c' as c, 'd' as d, 'e' as e, 'f' as f) as t2 on (t1.d = t2.b or t1.c = t2.b or t1.d = t2.b and t1.d = t2.b) or (t1.e = t2.e and t1.a=t2.a and t2.f=t1.f); -- { serverError INVALID_JOIN_ON_EXPRESSION }
select * from (select 'a' as a, 'b' as b, 'c' as c, 'd' as d, 'e' as e, 'f' as f) as t1 right join (select 'a' as a, 'b' as b, 'c' as c, 'd' as d, 'e' as e, 'f' as f) as t2 on (t1.d = t2.b or t1.c = t2.b) or t1.e = t2.e; -- { serverError INVALID_JOIN_ON_EXPRESSION }
select * from (select 'a' as a, 'b' as b, 'c' as c, 'd' as d, 'e' as e, 'f' as f) as t1 inner join (select 'a' as a, 'b' as b, 'c' as c, 'd' as d, 'e' as e, 'f' as f) as t2 on (t1.d = t2.b or t1.c = t2.b) or (t1.e = t2.e and t1.a=t2.a and t2.f=t1.f); -- { serverError INVALID_JOIN_ON_EXPRESSION }
select * from (select 'a' as a, 'b' as b, 'c' as c, 'd' as d, 'e' as e, 'f' as f) as t1 inner join (select 'a' as a, 'b' as b, 'c' as c, 'd' as d, 'e' as e, 'f' as f) as t2 on (t1.d = t2.b or t1.c = t2.b) and (t1.e = t2.e or t1.f=t2.f); -- { serverError INVALID_JOIN_ON_EXPRESSION }
select * from (select 'a' as a, 'b' as b, 'c' as c, 'd' as d, 'e' as e, 'f' as f) as t1 inner join (select 'a' as a, 'b' as b, 'c' as c, 'd' as d, 'e' as e, 'f' as f) as t2 on (t1.d = t2.b or t1.c = t2.b) and (t1.e = t2.e or t1.f=t2.f) SETTINGS enable_analyzer = 1;
SET joined_subquery_requires_alias = 0;
SET max_threads = 1;

Some files were not shown because too many files have changed in this diff Show More