Merge branch 'ClickHouse:master' into master

This commit is contained in:
Zimu Li 2024-05-15 22:38:22 -07:00 committed by GitHub
commit c0f5d70c5c
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
65 changed files with 673 additions and 212 deletions

View File

@ -3301,3 +3301,31 @@ The setting is not enabled by default for security reasons, because some headers
HTTP headers are case sensitive for this function.
If the function is used in the context of a distributed query, it returns non-empty result only on the initiator node.
## showCertificate
Shows information about the current server's Secure Sockets Layer (SSL) certificate if it has been configured. See [Configuring SSL-TLS](https://clickhouse.com/docs/en/guides/sre/configuring-ssl) for more information on how to configure ClickHouse to use OpenSSL certificates to validate connections.
**Syntax**
```sql
showCertificate()
```
**Returned value**
- Map of key-value pairs relating to the configured SSL certificate. [Map](../../sql-reference/data-types/map.md)([String](../../sql-reference/data-types/string.md), [String](../../sql-reference/data-types/string.md)).
**Example**
Query:
```sql
SELECT showCertificate() FORMAT LineAsString;
```
Result:
```response
{'version':'1','serial_number':'2D9071D64530052D48308473922C7ADAFA85D6C5','signature_algo':'sha256WithRSAEncryption','issuer':'/CN=marsnet.local CA','not_before':'May 7 17:01:21 2024 GMT','not_after':'May 7 17:01:21 2025 GMT','subject':'/CN=chnode1','pkey_algo':'rsaEncryption'}
```

View File

@ -1,5 +1,5 @@
---
slug: /en/operations/utilities/backupview
slug: /ru/operations/utilities/backupview
title: clickhouse_backupview
---

View File

@ -1,5 +1,5 @@
---
slug: /ru/sql-reference/functions/functions-for-nulls
slug: /ru/sql-reference/functions/null-functions
sidebar_position: 63
sidebar_label: "Функции для работы с Nullable-аргументами"
---

View File

@ -1,5 +1,5 @@
---
slug: /zh/sql-reference/functions/functions-for-nulls
slug: /zh/sql-reference/functions/null-functions
---
# Nullable处理函数 {#nullablechu-li-han-shu}

View File

@ -753,13 +753,21 @@ size_t getMaxArraySize()
return 0xFFFFFF;
}
bool hasLimitArraySize()
{
if (auto context = Context::getGlobalContextInstance())
return context->getServerSettings().aggregate_function_group_array_has_limit_size;
return false;
}
template <bool Tlast>
AggregateFunctionPtr createAggregateFunctionGroupArray(
const std::string & name, const DataTypes & argument_types, const Array & parameters, const Settings *)
{
assertUnary(name, argument_types);
bool limit_size = false;
bool limit_size = hasLimitArraySize();
UInt64 max_elems = getMaxArraySize();
if (parameters.empty())

View File

@ -28,7 +28,7 @@ namespace ErrorCodes
extern const int USER_SESSION_LIMIT_EXCEEDED;
}
static String getLoadSuggestionQuery(Int32 suggestion_limit, bool basic_suggestion)
static String getLoadSuggestionQuery(Int32 suggestion_limit, bool basic_suggestion, UInt64 server_revision)
{
/// NOTE: Once you will update the completion list,
/// do not forget to update 01676_clickhouse_client_autocomplete.sh
@ -60,7 +60,9 @@ static String getLoadSuggestionQuery(Int32 suggestion_limit, bool basic_suggesti
add_column("name", "data_type_families", false, {});
add_column("name", "merge_tree_settings", false, {});
add_column("name", "settings", false, {});
add_column("keyword", "keywords", false, {});
if (server_revision >= DBMS_MIN_REVISION_WITH_SYSTEM_KEYWORDS_TABLE)
add_column("keyword", "keywords", false, {});
if (!basic_suggestion)
{
@ -101,7 +103,11 @@ void Suggest::load(ContextPtr context, const ConnectionParameters & connection_p
auto connection = ConnectionType::createConnection(connection_parameters, my_context);
fetch(*connection,
connection_parameters.timeouts,
getLoadSuggestionQuery(suggestion_limit, std::is_same_v<ConnectionType, LocalConnection>),
getLoadSuggestionQuery(
suggestion_limit,
std::is_same_v<ConnectionType, LocalConnection>,
connection->getServerRevision(connection_parameters.timeouts)
),
my_context->getClientInfo());
}
catch (const Exception & e)
@ -146,7 +152,7 @@ void Suggest::load(IServerConnection & connection,
{
try
{
fetch(connection, timeouts, getLoadSuggestionQuery(suggestion_limit, true), client_info);
fetch(connection, timeouts, getLoadSuggestionQuery(suggestion_limit, true, connection.getServerRevision(timeouts)), client_info);
}
catch (...)
{

View File

@ -625,6 +625,8 @@ The server successfully detected this situation and will download merged part fr
M(S3QueueSetFileProcessingMicroseconds, "Time spent to set file as processing")\
M(S3QueueSetFileProcessedMicroseconds, "Time spent to set file as processed")\
M(S3QueueSetFileFailedMicroseconds, "Time spent to set file as failed")\
M(S3QueueFailedFiles, "Number of files which failed to be processed")\
M(S3QueueProcessedFiles, "Number of files which were processed")\
M(S3QueueCleanupMaxSetSizeOrTTLMicroseconds, "Time spent to set file as failed")\
M(S3QueuePullMicroseconds, "Time spent to read file data")\
M(S3QueueLockLocalFileStatusesMicroseconds, "Time spent to lock local file statuses")\

View File

@ -79,6 +79,8 @@ static constexpr auto DBMS_MIN_REVISION_WITH_SSH_AUTHENTICATION = 54466;
/// Send read-only flag for Replicated tables as well
static constexpr auto DBMS_MIN_REVISION_WITH_TABLE_READ_ONLY_CHECK = 54467;
static constexpr auto DBMS_MIN_REVISION_WITH_SYSTEM_KEYWORDS_TABLE = 54468;
/// Version of ClickHouse TCP protocol.
///
/// Should be incremented manually on protocol changes.
@ -86,6 +88,6 @@ static constexpr auto DBMS_MIN_REVISION_WITH_TABLE_READ_ONLY_CHECK = 54467;
/// NOTE: DBMS_TCP_PROTOCOL_VERSION has nothing common with VERSION_REVISION,
/// later is just a number for server version (one number instead of commit SHA)
/// for simplicity (sometimes it may be more convenient in some use cases).
static constexpr auto DBMS_TCP_PROTOCOL_VERSION = 54467;
static constexpr auto DBMS_TCP_PROTOCOL_VERSION = 54468;
}

View File

@ -50,6 +50,7 @@ namespace DB
M(UInt64, max_temporary_data_on_disk_size, 0, "The maximum amount of storage that could be used for external aggregation, joins or sorting., ", 0) \
M(String, temporary_data_in_cache, "", "Cache disk name for temporary data.", 0) \
M(UInt64, aggregate_function_group_array_max_element_size, 0xFFFFFF, "Max array element size in bytes for groupArray function. This limit is checked at serialization and help to avoid large state size.", 0) \
M(Bool, aggregate_function_group_array_has_limit_size, false, "When the max array element size is exceeded, a `Too large array size` exception will be thrown by default. When set to true, no exception will be thrown, and the excess elements will be discarded.", 0) \
M(UInt64, max_server_memory_usage, 0, "Maximum total memory usage of the server in bytes. Zero means unlimited.", 0) \
M(Double, max_server_memory_usage_to_ram_ratio, 0.9, "Same as max_server_memory_usage but in to RAM ratio. Allows to lower max memory on low-memory systems.", 0) \
M(UInt64, merges_mutations_memory_usage_soft_limit, 0, "Maximum total memory usage for merges and mutations in bytes. Zero means unlimited.", 0) \
@ -57,7 +58,7 @@ namespace DB
M(Bool, allow_use_jemalloc_memory, true, "Allows to use jemalloc memory.", 0) \
M(UInt64, cgroups_memory_usage_observer_wait_time, 15, "Polling interval in seconds to read the current memory usage from cgroups. Zero means disabled.", 0) \
M(Double, cgroup_memory_watcher_hard_limit_ratio, 0.95, "Hard memory limit ratio for cgroup memory usage observer", 0) \
M(Double, cgroup_memory_watcher_soft_limit_ratio, 0.9, "Sort memory limit ratio limit for cgroup memory usage observer", 0) \
M(Double, cgroup_memory_watcher_soft_limit_ratio, 0.9, "Soft memory limit ratio limit for cgroup memory usage observer", 0) \
M(UInt64, async_insert_threads, 16, "Maximum number of threads to actually parse and insert data in background. Zero means asynchronous mode is disabled", 0) \
M(Bool, async_insert_queue_flush_on_shutdown, true, "If true queue of asynchronous inserts is flushed on graceful shutdown", 0) \
M(Bool, ignore_empty_sql_security_in_create_view_query, true, "If true, ClickHouse doesn't write defaults for empty SQL security statement in CREATE VIEW queries. This setting is only necessary for the migration period and will become obsolete in 24.4", 0) \

View File

@ -92,7 +92,7 @@ static std::map<ClickHouseVersion, SettingsChangesHistory::SettingsChanges> sett
{"cross_join_min_bytes_to_compress", 0, 1_GiB, "A new setting."},
{"prefer_external_sort_block_bytes", 0, DEFAULT_BLOCK_SIZE * 256, "Prefer maximum block bytes for external sort, reduce the memory usage during merging."},
{"input_format_force_null_for_omitted_fields", false, false, "Disable type-defaults for omitted fields when needed"},
{"output_format_pretty_preserve_border_for_multiline_string", 1, 1, "Applies better rendering for multiline strings."},
{"output_format_pretty_preserve_border_for_multiline_string", 0, 1, "Applies better rendering for multiline strings."},
}},
{"24.4", {{"input_format_json_throw_on_bad_escape_sequence", true, true, "Allow to save JSON strings with bad escape sequences"},
{"max_parsing_threads", 0, 0, "Add a separate setting to control number of threads in parallel parsing from files"},

View File

@ -139,7 +139,11 @@ namespace
S3::URI getS3URI(const Poco::Util::AbstractConfiguration & config, const std::string & config_prefix, const ContextPtr & context)
{
String endpoint = context->getMacros()->expand(config.getString(config_prefix + ".endpoint"));
S3::URI uri(endpoint);
String endpoint_subpath;
if (config.has(config_prefix + ".endpoint_subpath"))
endpoint_subpath = context->getMacros()->expand(config.getString(config_prefix + ".endpoint_subpath"));
S3::URI uri(fs::path(endpoint) / endpoint_subpath);
/// An empty key remains empty.
if (!uri.key.empty() && !uri.key.ends_with('/'))

View File

@ -7,6 +7,7 @@
#include <Columns/ColumnFixedString.h>
#include <Columns/ColumnLowCardinality.h>
#include <Columns/ColumnMap.h>
#include <Columns/ColumnNothing.h>
#include <Columns/ColumnNullable.h>
#include <Columns/ColumnObject.h>
#include <Columns/ColumnString.h>
@ -3770,6 +3771,12 @@ private:
}
else if (const auto * from_array = typeid_cast<const DataTypeArray *>(from_type_untyped.get()))
{
if (typeid_cast<const DataTypeNothing *>(from_array->getNestedType().get()))
return [nested = to_type->getNestedType()](ColumnsWithTypeAndName &, const DataTypePtr &, const ColumnNullable *, size_t size)
{
return ColumnMap::create(nested->createColumnConstWithDefaultValue(size)->convertToFullColumnIfConst());
};
const auto * nested_tuple = typeid_cast<const DataTypeTuple *>(from_array->getNestedType().get());
if (!nested_tuple || nested_tuple->getElements().size() != 2)
throw Exception(

View File

@ -505,7 +505,7 @@ ASTPtr InterpreterCreateQuery::formatProjections(const ProjectionsDescription &
}
ColumnsDescription InterpreterCreateQuery::getColumnsDescription(
const ASTExpressionList & columns_ast, ContextPtr context_, LoadingStrictnessLevel mode)
const ASTExpressionList & columns_ast, ContextPtr context_, LoadingStrictnessLevel mode, bool is_restore_from_backup)
{
/// First, deduce implicit types.
@ -514,7 +514,7 @@ ColumnsDescription InterpreterCreateQuery::getColumnsDescription(
ASTPtr default_expr_list = std::make_shared<ASTExpressionList>();
NamesAndTypesList column_names_and_types;
bool make_columns_nullable = mode <= LoadingStrictnessLevel::CREATE && context_->getSettingsRef().data_type_default_nullable;
bool make_columns_nullable = mode <= LoadingStrictnessLevel::SECONDARY_CREATE && !is_restore_from_backup && context_->getSettingsRef().data_type_default_nullable;
bool has_columns_with_default_without_type = false;
for (const auto & ast : columns_ast.children)
@ -694,7 +694,7 @@ ColumnsDescription InterpreterCreateQuery::getColumnsDescription(
res.add(std::move(column));
}
if (mode <= LoadingStrictnessLevel::CREATE && context_->getSettingsRef().flatten_nested)
if (mode <= LoadingStrictnessLevel::SECONDARY_CREATE && !is_restore_from_backup && context_->getSettingsRef().flatten_nested)
res.flattenNested();
@ -739,7 +739,7 @@ InterpreterCreateQuery::TableProperties InterpreterCreateQuery::getTableProperti
if (create.columns_list->columns)
{
properties.columns = getColumnsDescription(*create.columns_list->columns, getContext(), mode);
properties.columns = getColumnsDescription(*create.columns_list->columns, getContext(), mode, is_restore_from_backup);
}
if (create.columns_list->indices)

View File

@ -74,7 +74,7 @@ public:
/// Obtain information about columns, their types, default values and column comments,
/// for case when columns in CREATE query is specified explicitly.
static ColumnsDescription getColumnsDescription(const ASTExpressionList & columns, ContextPtr context, LoadingStrictnessLevel mode);
static ColumnsDescription getColumnsDescription(const ASTExpressionList & columns, ContextPtr context, LoadingStrictnessLevel mode, bool is_restore_from_backup = false);
static ConstraintsDescription getConstraintsDescription(const ASTExpressionList * constraints);
static void prepareOnClusterQuery(ASTCreateQuery & create, ContextPtr context, const String & cluster_name);

View File

@ -700,8 +700,10 @@ void MergeJoin::joinBlock(Block & block, ExtraBlockPtr & not_processed)
/// We need to check type of masks before `addConditionJoinColumn`, because it assumes that types is correct
JoinCommon::checkTypesOfMasks(block, mask_column_name_left, right_sample_block, mask_column_name_right);
/// Add auxiliary column, will be removed after joining
addConditionJoinColumn(block, JoinTableSide::Left);
if (!not_processed)
/// Add an auxiliary column, which will be removed after joining
/// We do not need to add it twice when we are continuing to process the block from the previous iteration
addConditionJoinColumn(block, JoinTableSide::Left);
/// Types of keys can be checked only after `checkTypesOfKeys`
JoinCommon::checkTypesOfKeys(block, key_names_left, right_table_keys, key_names_right);

View File

@ -107,6 +107,9 @@ bool ParserInsertQuery::parseImpl(Pos & pos, ASTPtr & node, Expected & expected)
if (!columns_p.parse(pos, columns, expected))
return false;
/// Optional trailing comma
ParserToken(TokenType::Comma).ignore(pos);
if (!s_rparen.ignore(pos, expected))
return false;
}

View File

@ -4,20 +4,6 @@
namespace DB
{
Tokens::Tokens(const char * begin, const char * end, size_t max_query_size, bool skip_insignificant)
{
Lexer lexer(begin, end, max_query_size);
bool stop = false;
do
{
Token token = lexer.nextToken();
stop = token.isEnd() || token.type == TokenType::ErrorMaxQuerySizeExceeded;
if (token.isSignificant() || (!skip_insignificant && !data.empty() && data.back().isSignificant()))
data.emplace_back(std::move(token));
} while (!stop);
}
UnmatchedParentheses checkUnmatchedParentheses(TokenIterator begin)
{
/// We have just two kind of parentheses: () and [].

View File

@ -15,25 +15,44 @@ namespace DB
*/
/** Used as an input for parsers.
* All whitespace and comment tokens are transparently skipped.
* All whitespace and comment tokens are transparently skipped if `skip_insignificant`.
*/
class Tokens
{
private:
std::vector<Token> data;
std::size_t last_accessed_index = 0;
Lexer lexer;
bool skip_insignificant;
public:
Tokens(const char * begin, const char * end, size_t max_query_size = 0, bool skip_insignificant = true);
ALWAYS_INLINE inline const Token & operator[](size_t index)
Tokens(const char * begin, const char * end, size_t max_query_size = 0, bool skip_insignificant_ = true)
: lexer(begin, end, max_query_size), skip_insignificant(skip_insignificant_)
{
assert(index < data.size());
last_accessed_index = std::max(last_accessed_index, index);
return data[index];
}
ALWAYS_INLINE inline const Token & max() { return data[last_accessed_index]; }
const Token & operator[] (size_t index)
{
while (true)
{
if (index < data.size())
return data[index];
if (!data.empty() && data.back().isEnd())
return data.back();
Token token = lexer.nextToken();
if (!skip_insignificant || token.isSignificant())
data.emplace_back(token);
}
}
const Token & max()
{
if (data.empty())
return (*this)[0];
return data.back();
}
};

View File

@ -13,10 +13,14 @@ namespace ErrorCodes
{
extern const int INCORRECT_DATA;
extern const int LOGICAL_ERROR;
extern const int BAD_ARGUMENTS;
}
RegexpFieldExtractor::RegexpFieldExtractor(const FormatSettings & format_settings) : regexp(format_settings.regexp.regexp), skip_unmatched(format_settings.regexp.skip_unmatched)
RegexpFieldExtractor::RegexpFieldExtractor(const FormatSettings & format_settings) : regexp_str(format_settings.regexp.regexp), regexp(regexp_str), skip_unmatched(format_settings.regexp.skip_unmatched)
{
if (regexp_str.empty())
throw Exception(ErrorCodes::BAD_ARGUMENTS, "The regular expression is not set for the `Regexp` format. It requires setting the value of the `format_regexp` setting.");
size_t fields_count = regexp.NumberOfCapturingGroups();
matched_fields.resize(fields_count);
re2_arguments.resize(fields_count);
@ -58,8 +62,8 @@ bool RegexpFieldExtractor::parseRow(PeekableReadBuffer & buf)
static_cast<int>(re2_arguments_ptrs.size()));
if (!match && !skip_unmatched)
throw Exception(ErrorCodes::INCORRECT_DATA, "Line \"{}\" doesn't match the regexp.",
std::string(buf.position(), line_to_match));
throw Exception(ErrorCodes::INCORRECT_DATA, "Line \"{}\" doesn't match the regexp: `{}`",
std::string(buf.position(), line_to_match), regexp_str);
buf.position() += line_size;
if (!buf.eof() && !checkChar('\n', buf))

View File

@ -31,6 +31,7 @@ public:
size_t getNumberOfGroups() const { return regexp.NumberOfCapturingGroups(); }
private:
String regexp_str;
const re2::RE2 regexp;
// The vector of fields extracted from line using regexp.
std::vector<std::string_view> matched_fields;

View File

@ -572,9 +572,16 @@ bool ValuesBlockInputFormat::checkDelimiterAfterValue(size_t column_idx)
skipWhitespaceIfAny(*buf);
if (likely(column_idx + 1 != num_columns))
{
return checkChar(',', *buf);
}
else
{
/// Optional trailing comma.
if (checkChar(',', *buf))
skipWhitespaceIfAny(*buf);
return checkChar(')', *buf);
}
}
bool ValuesBlockInputFormat::shouldDeduceNewTemplate(size_t column_idx)

View File

@ -64,36 +64,61 @@ namespace
return non_const_columns;
}
/// build actions DAG from stack of steps
ActionsDAGPtr buildActionsForPlanPath(std::vector<ActionsDAGPtr> & dag_stack)
{
if (dag_stack.empty())
return nullptr;
ActionsDAGPtr path_actions = dag_stack.back()->clone();
dag_stack.pop_back();
while (!dag_stack.empty())
{
ActionsDAGPtr clone = dag_stack.back()->clone();
logActionsDAG("DAG to merge", clone);
dag_stack.pop_back();
path_actions->mergeInplace(std::move(*clone));
}
return path_actions;
}
bool compareAggregationKeysWithDistinctColumns(
const Names & aggregation_keys, const DistinctColumns & distinct_columns, const ActionsDAGPtr & path_actions)
const Names & aggregation_keys, const DistinctColumns & distinct_columns, std::vector<std::vector<ActionsDAGPtr>> actions_chain)
{
logDebug("aggregation_keys", aggregation_keys);
logDebug("aggregation_keys size", aggregation_keys.size());
logDebug("distinct_columns size", distinct_columns.size());
std::set<std::string_view> original_distinct_columns;
FindOriginalNodeForOutputName original_node_finder(path_actions);
for (const auto & column : distinct_columns)
std::set<String> current_columns(begin(distinct_columns), end(distinct_columns));
std::set<String> source_columns;
for (auto & actions : actions_chain)
{
logDebug("distinct column name", column);
const auto * alias_node = original_node_finder.find(String(column));
if (!alias_node)
FindOriginalNodeForOutputName original_node_finder(buildActionsForPlanPath(actions));
for (const auto & column : current_columns)
{
logDebug("original name for alias is not found", column);
original_distinct_columns.insert(column);
}
else
{
logDebug("alias result name", alias_node->result_name);
original_distinct_columns.insert(alias_node->result_name);
logDebug("distinct column name", column);
const auto * alias_node = original_node_finder.find(String(column));
if (!alias_node)
{
logDebug("original name for alias is not found", column);
source_columns.insert(String(column));
}
else
{
logDebug("alias result name", alias_node->result_name);
source_columns.insert(alias_node->result_name);
}
}
current_columns = std::move(source_columns);
source_columns.clear();
}
/// if aggregation keys are part of distinct columns then rows already distinct
for (const auto & key : aggregation_keys)
{
if (!original_distinct_columns.contains(key))
if (!current_columns.contains(key))
{
logDebug("aggregation key NOT found: {}", key);
logDebug("aggregation key NOT found", key);
return false;
}
}
@ -122,30 +147,13 @@ namespace
return false;
}
/// build actions DAG from stack of steps
ActionsDAGPtr buildActionsForPlanPath(std::vector<ActionsDAGPtr> & dag_stack)
{
if (dag_stack.empty())
return nullptr;
ActionsDAGPtr path_actions = dag_stack.back()->clone();
dag_stack.pop_back();
while (!dag_stack.empty())
{
ActionsDAGPtr clone = dag_stack.back()->clone();
logActionsDAG("DAG to merge", clone);
dag_stack.pop_back();
path_actions->mergeInplace(std::move(*clone));
}
return path_actions;
}
bool passTillAggregation(const QueryPlan::Node * distinct_node)
{
const DistinctStep * distinct_step = typeid_cast<DistinctStep *>(distinct_node->step.get());
chassert(distinct_step);
std::vector<ActionsDAGPtr> dag_stack;
std::vector<std::vector<ActionsDAGPtr>> actions_chain;
const DistinctStep * inner_distinct_step = nullptr;
const IQueryPlanStep * aggregation_before_distinct = nullptr;
const QueryPlan::Node * node = distinct_node;
@ -163,6 +171,12 @@ namespace
break;
}
if (typeid_cast<const WindowStep *>(current_step))
{
actions_chain.push_back(std::move(dag_stack));
dag_stack.clear();
}
if (const auto * const expr = typeid_cast<const ExpressionStep *>(current_step); expr)
dag_stack.push_back(expr->getExpression());
else if (const auto * const filter = typeid_cast<const FilterStep *>(current_step); filter)
@ -177,16 +191,22 @@ namespace
if (aggregation_before_distinct)
{
ActionsDAGPtr actions = buildActionsForPlanPath(dag_stack);
logActionsDAG("aggregation pass: merged DAG", actions);
if (actions_chain.empty())
actions_chain.push_back(std::move(dag_stack));
const auto distinct_columns = getDistinctColumns(distinct_step);
if (const auto * aggregating_step = typeid_cast<const AggregatingStep *>(aggregation_before_distinct); aggregating_step)
return compareAggregationKeysWithDistinctColumns(aggregating_step->getParams().keys, distinct_columns, actions);
{
return compareAggregationKeysWithDistinctColumns(
aggregating_step->getParams().keys, distinct_columns, std::move(actions_chain));
}
else if (const auto * merging_aggregated_step = typeid_cast<const MergingAggregatedStep *>(aggregation_before_distinct);
merging_aggregated_step)
return compareAggregationKeysWithDistinctColumns(merging_aggregated_step->getParams().keys, distinct_columns, actions);
{
return compareAggregationKeysWithDistinctColumns(
merging_aggregated_step->getParams().keys, distinct_columns, std::move(actions_chain));
}
}
return false;

View File

@ -24,6 +24,8 @@ namespace ProfileEvents
extern const Event S3QueueSetFileProcessingMicroseconds;
extern const Event S3QueueSetFileProcessedMicroseconds;
extern const Event S3QueueSetFileFailedMicroseconds;
extern const Event S3QueueFailedFiles;
extern const Event S3QueueProcessedFiles;
extern const Event S3QueueCleanupMaxSetSizeOrTTLMicroseconds;
extern const Event S3QueueLockLocalFileStatusesMicroseconds;
extern const Event CannotRemoveEphemeralNode;
@ -138,7 +140,7 @@ S3QueueFilesMetadata::S3QueueFilesMetadata(const fs::path & zookeeper_path_, con
, zookeeper_failed_path(zookeeper_path_ / "failed")
, zookeeper_shards_path(zookeeper_path_ / "shards")
, zookeeper_cleanup_lock_path(zookeeper_path_ / "cleanup_lock")
, log(getLogger("S3QueueFilesMetadata"))
, log(getLogger("StorageS3Queue(" + zookeeper_path_.string() + ")"))
{
if (mode == S3QueueMode::UNORDERED && (max_set_size || max_set_age_sec))
{
@ -472,7 +474,7 @@ S3QueueFilesMetadata::trySetFileAsProcessingForUnorderedMode(const std::string &
if (code == Coordination::Error::ZOK)
{
auto holder = std::make_unique<ProcessingNodeHolder>(
node_metadata.processing_id, path, zookeeper_processing_path / node_name, file_status, zk_client);
node_metadata.processing_id, path, zookeeper_processing_path / node_name, file_status, zk_client, log);
return std::pair{SetFileProcessingResult::Success, std::move(holder)};
}
@ -558,7 +560,7 @@ S3QueueFilesMetadata::trySetFileAsProcessingForOrderedMode(const std::string & p
if (code == Coordination::Error::ZOK)
{
auto holder = std::make_unique<ProcessingNodeHolder>(
node_metadata.processing_id, path, zookeeper_processing_path / node_name, file_status, zk_client);
node_metadata.processing_id, path, zookeeper_processing_path / node_name, file_status, zk_client, log);
LOG_TEST(log, "File {} is ready to be processed", path);
return std::pair{SetFileProcessingResult::Success, std::move(holder)};
@ -609,6 +611,8 @@ void S3QueueFilesMetadata::setFileProcessed(ProcessingNodeHolderPtr holder)
break;
}
}
ProfileEvents::increment(ProfileEvents::S3QueueProcessedFiles);
}
void S3QueueFilesMetadata::setFileProcessedForUnorderedMode(ProcessingNodeHolderPtr holder)
@ -626,7 +630,7 @@ void S3QueueFilesMetadata::setFileProcessedForUnorderedMode(ProcessingNodeHolder
Coordination::Responses responses;
if (holder->remove(&requests, &responses))
{
LOG_TEST(log, "Moved file `{}` to processed", path);
LOG_TRACE(log, "Moved file `{}` to processed", path);
if (max_loading_retries)
zk_client->tryRemove(zookeeper_failed_path / (node_name + ".retriable"), -1);
return;
@ -663,7 +667,7 @@ void S3QueueFilesMetadata::setFileProcessedForOrderedModeImpl(
const auto node_metadata = createNodeMetadata(path).toString();
const auto zk_client = getZooKeeper();
LOG_TEST(log, "Setting file `{}` as processed (at {})", path, processed_node_path);
LOG_TRACE(log, "Setting file `{}` as processed (at {})", path, processed_node_path);
while (true)
{
std::string res;
@ -693,7 +697,7 @@ void S3QueueFilesMetadata::setFileProcessedForOrderedModeImpl(
{
if (holder->remove(&requests, &responses))
{
LOG_TEST(log, "Moved file `{}` to processed", path);
LOG_TRACE(log, "Moved file `{}` to processed", path);
if (max_loading_retries)
zk_client->tryRemove(zookeeper_failed_path / (node_name + ".retriable"), -1);
return;
@ -704,7 +708,7 @@ void S3QueueFilesMetadata::setFileProcessedForOrderedModeImpl(
auto code = zk_client->tryMulti(requests, responses);
if (code == Coordination::Error::ZOK)
{
LOG_TEST(log, "Moved file `{}` to processed", path);
LOG_TRACE(log, "Moved file `{}` to processed", path);
return;
}
}
@ -712,7 +716,8 @@ void S3QueueFilesMetadata::setFileProcessedForOrderedModeImpl(
/// Failed to update max processed node, retry.
if (!responses.empty() && responses[0]->error != Coordination::Error::ZOK)
{
LOG_TRACE(log, "Failed to update processed node ({}). Will retry.", magic_enum::enum_name(responses[0]->error));
LOG_TRACE(log, "Failed to update processed node for path {} ({}). Will retry.",
path, magic_enum::enum_name(responses[0]->error));
continue;
}
@ -752,6 +757,8 @@ void S3QueueFilesMetadata::setFileFailed(ProcessingNodeHolderPtr holder, const S
file_status->processing_end_time = std::chrono::system_clock::to_time_t(std::chrono::system_clock::now());
}
ProfileEvents::increment(ProfileEvents::S3QueueFailedFiles);
SCOPE_EXIT({
file_status->profile_counters.increment(ProfileEvents::S3QueueSetFileFailedMicroseconds, timer.get());
timer.cancel();
@ -774,7 +781,7 @@ void S3QueueFilesMetadata::setFileFailed(ProcessingNodeHolderPtr holder, const S
Coordination::Responses responses;
if (holder->remove(&requests, &responses))
{
LOG_TEST(log, "File `{}` failed to process and will not be retried. "
LOG_TRACE(log, "File `{}` failed to process and will not be retried. "
"Error: {}", path, exception_message);
return;
}
@ -788,6 +795,7 @@ void S3QueueFilesMetadata::setFileFailed(ProcessingNodeHolderPtr holder, const S
LOG_WARNING(log, "Cannot set file ({}) as processed since processing node "
"does not exist with expected processing id does not exist, "
"this could be a result of expired zookeeper session", path);
return;
}
@ -812,7 +820,7 @@ void S3QueueFilesMetadata::setFileFailed(ProcessingNodeHolderPtr holder, const S
file_status->retries = node_metadata.retries;
}
LOG_TEST(log, "File `{}` failed to process, try {}/{} (Error: {})",
LOG_TRACE(log, "File `{}` failed to process, try {}/{} (Error: {})",
path, node_metadata.retries, max_loading_retries, exception_message);
/// Check if file can be retried further or not.
@ -868,13 +876,14 @@ S3QueueFilesMetadata::ProcessingNodeHolder::ProcessingNodeHolder(
const std::string & path_,
const std::string & zk_node_path_,
FileStatusPtr file_status_,
zkutil::ZooKeeperPtr zk_client_)
zkutil::ZooKeeperPtr zk_client_,
LoggerPtr logger_)
: zk_client(zk_client_)
, file_status(file_status_)
, path(path_)
, zk_node_path(zk_node_path_)
, processing_id(processing_id_)
, log(getLogger("ProcessingNodeHolder"))
, log(logger_)
{
}
@ -939,7 +948,7 @@ bool S3QueueFilesMetadata::ProcessingNodeHolder::remove(Coordination::Requests *
catch (...)
{
ProfileEvents::increment(ProfileEvents::CannotRemoveEphemeralNode);
DB::tryLogCurrentException(__PRETTY_FUNCTION__, "Cannot remove " + zk_node_path);
LOG_ERROR(log, "Failed to remove processing node for file {}: {}", path, getCurrentExceptionMessage(true));
}
return false;
}
@ -958,7 +967,7 @@ void S3QueueFilesMetadata::cleanupThreadFunc()
}
catch (...)
{
tryLogCurrentException(__PRETTY_FUNCTION__);
LOG_ERROR(log, "Failed to cleanup nodes in zookeeper: {}", getCurrentExceptionMessage(true));
}
if (shutdown)
@ -970,38 +979,52 @@ void S3QueueFilesMetadata::cleanupThreadFunc()
void S3QueueFilesMetadata::cleanupThreadFuncImpl()
{
auto timer = DB::CurrentThread::getProfileEvents().timer(ProfileEvents::S3QueueCleanupMaxSetSizeOrTTLMicroseconds);
chassert(max_set_size || max_set_age_sec);
const bool check_nodes_limit = max_set_size > 0;
const bool check_nodes_ttl = max_set_age_sec > 0;
const auto zk_client = getZooKeeper();
Strings nodes;
auto code = zk_client->tryGetChildren(zookeeper_processed_path, nodes);
Strings processed_nodes;
auto code = zk_client->tryGetChildren(zookeeper_processed_path, processed_nodes);
if (code != Coordination::Error::ZOK)
{
if (code == Coordination::Error::ZNONODE)
{
LOG_TEST(log, "A `processed` not is not yet created");
return;
LOG_TEST(log, "Path {} does not exist", zookeeper_processed_path.string());
}
throw Exception(ErrorCodes::LOGICAL_ERROR, "Unexpected error: {}", magic_enum::enum_name(code));
else
throw Exception(ErrorCodes::LOGICAL_ERROR, "Unexpected error: {}", magic_enum::enum_name(code));
}
if (nodes.empty())
Strings failed_nodes;
code = zk_client->tryGetChildren(zookeeper_failed_path, failed_nodes);
if (code != Coordination::Error::ZOK)
{
LOG_TEST(log, "A set of nodes is empty");
if (code == Coordination::Error::ZNONODE)
{
LOG_TEST(log, "Path {} does not exist", zookeeper_failed_path.string());
}
else
throw Exception(ErrorCodes::LOGICAL_ERROR, "Unexpected error: {}", magic_enum::enum_name(code));
}
const size_t nodes_num = processed_nodes.size() + failed_nodes.size();
if (!nodes_num)
{
LOG_TEST(log, "There are neither processed nor failed nodes");
return;
}
const bool nodes_limit_exceeded = nodes.size() > max_set_size;
if (!nodes_limit_exceeded && check_nodes_limit && !check_nodes_ttl)
chassert(max_set_size || max_set_age_sec);
const bool check_nodes_limit = max_set_size > 0;
const bool check_nodes_ttl = max_set_age_sec > 0;
const bool nodes_limit_exceeded = nodes_num > max_set_size;
if ((!nodes_limit_exceeded || !check_nodes_limit) && !check_nodes_ttl)
{
LOG_TEST(log, "No limit exceeded");
return;
}
LOG_TRACE(log, "Will check limits for {} nodes", nodes_num);
/// Create a lock so that with distributed processing
/// multiple nodes do not execute cleanup in parallel.
auto ephemeral_node = zkutil::EphemeralNodeHolder::tryCreate(zookeeper_cleanup_lock_path, *zk_client, toString(getCurrentTime()));
@ -1014,7 +1037,7 @@ void S3QueueFilesMetadata::cleanupThreadFuncImpl()
struct Node
{
std::string name;
std::string zk_path;
NodeMetadata metadata;
};
auto node_cmp = [](const Node & a, const Node & b)
@ -1026,24 +1049,57 @@ void S3QueueFilesMetadata::cleanupThreadFuncImpl()
/// Ordered in ascending order of timestamps.
std::set<Node, decltype(node_cmp)> sorted_nodes(node_cmp);
LOG_TRACE(log, "Found {} nodes", nodes.size());
for (const auto & node : nodes)
for (const auto & node : processed_nodes)
{
const std::string path = zookeeper_processed_path / node;
try
{
std::string metadata_str;
if (zk_client->tryGet(zookeeper_processed_path / node, metadata_str))
if (zk_client->tryGet(path, metadata_str))
{
sorted_nodes.emplace(node, NodeMetadata::fromString(metadata_str));
LOG_TEST(log, "Fetched metadata for node {}", node);
sorted_nodes.emplace(path, NodeMetadata::fromString(metadata_str));
LOG_TEST(log, "Fetched metadata for node {}", path);
}
else
LOG_TEST(log, "Failed to fetch node metadata {}", node);
LOG_ERROR(log, "Failed to fetch node metadata {}", path);
}
catch (...)
catch (const zkutil::KeeperException & e)
{
tryLogCurrentException(__PRETTY_FUNCTION__);
if (e.code != Coordination::Error::ZCONNECTIONLOSS)
{
LOG_WARNING(log, "Unexpected exception: {}", getCurrentExceptionMessage(true));
chassert(false);
}
/// Will retry with a new zk connection.
throw;
}
}
for (const auto & node : failed_nodes)
{
const std::string path = zookeeper_failed_path / node;
try
{
std::string metadata_str;
if (zk_client->tryGet(path, metadata_str))
{
sorted_nodes.emplace(path, NodeMetadata::fromString(metadata_str));
LOG_TEST(log, "Fetched metadata for node {}", path);
}
else
LOG_ERROR(log, "Failed to fetch node metadata {}", path);
}
catch (const zkutil::KeeperException & e)
{
if (e.code != Coordination::Error::ZCONNECTIONLOSS)
{
LOG_WARNING(log, "Unexpected exception: {}", getCurrentExceptionMessage(true));
chassert(false);
}
/// Will retry with a new zk connection.
throw;
}
}
@ -1056,37 +1112,35 @@ void S3QueueFilesMetadata::cleanupThreadFuncImpl()
};
LOG_TEST(log, "Checking node limits (max size: {}, max age: {}) for {}", max_set_size, max_set_age_sec, get_nodes_str());
size_t nodes_to_remove = check_nodes_limit && nodes_limit_exceeded ? nodes.size() - max_set_size : 0;
size_t nodes_to_remove = check_nodes_limit && nodes_limit_exceeded ? nodes_num - max_set_size : 0;
for (const auto & node : sorted_nodes)
{
if (nodes_to_remove)
{
auto path = zookeeper_processed_path / node.name;
LOG_TEST(log, "Removing node at path {} ({}) because max files limit is reached",
node.metadata.file_path, path.string());
LOG_TRACE(log, "Removing node at path {} ({}) because max files limit is reached",
node.metadata.file_path, node.zk_path);
local_file_statuses.remove(node.metadata.file_path, /* if_exists */true);
code = zk_client->tryRemove(path);
code = zk_client->tryRemove(node.zk_path);
if (code == Coordination::Error::ZOK)
--nodes_to_remove;
else
LOG_ERROR(log, "Failed to remove a node `{}` (code: {})", path.string(), code);
LOG_ERROR(log, "Failed to remove a node `{}` (code: {})", node.zk_path, code);
}
else if (check_nodes_ttl)
{
UInt64 node_age = getCurrentTime() - node.metadata.last_processed_timestamp;
if (node_age >= max_set_age_sec)
{
auto path = zookeeper_processed_path / node.name;
LOG_TEST(log, "Removing node at path {} ({}) because file is reached",
node.metadata.file_path, path.string());
LOG_TRACE(log, "Removing node at path {} ({}) because file is reached",
node.metadata.file_path, node.zk_path);
local_file_statuses.remove(node.metadata.file_path, /* if_exists */true);
code = zk_client->tryRemove(path);
code = zk_client->tryRemove(node.zk_path);
if (code != Coordination::Error::ZOK)
LOG_ERROR(log, "Failed to remove a node `{}` (code: {})", path.string(), code);
LOG_ERROR(log, "Failed to remove a node `{}` (code: {})", node.zk_path, code);
}
else if (!nodes_to_remove)
{

View File

@ -193,7 +193,8 @@ public:
const std::string & path_,
const std::string & zk_node_path_,
FileStatusPtr file_status_,
zkutil::ZooKeeperPtr zk_client_);
zkutil::ZooKeeperPtr zk_client_,
LoggerPtr logger_);
~ProcessingNodeHolder();

View File

@ -21,7 +21,8 @@ S3QueueMetadataFactory::getOrCreate(const std::string & zookeeper_path, const S3
auto it = metadata_by_path.find(zookeeper_path);
if (it == metadata_by_path.end())
{
it = metadata_by_path.emplace(zookeeper_path, std::make_shared<S3QueueFilesMetadata>(fs::path(zookeeper_path), settings)).first;
auto files_metadata = std::make_shared<S3QueueFilesMetadata>(zookeeper_path, settings);
it = metadata_by_path.emplace(zookeeper_path, std::move(files_metadata)).first;
}
else if (it->second.metadata->checkSettings(settings))
{

View File

@ -44,11 +44,12 @@ StorageS3QueueSource::FileIterator::FileIterator(
std::shared_ptr<S3QueueFilesMetadata> metadata_,
std::unique_ptr<GlobIterator> glob_iterator_,
size_t current_shard_,
std::atomic<bool> & shutdown_called_)
std::atomic<bool> & shutdown_called_,
LoggerPtr logger_)
: metadata(metadata_)
, glob_iterator(std::move(glob_iterator_))
, shutdown_called(shutdown_called_)
, log(&Poco::Logger::get("StorageS3QueueSource"))
, log(logger_)
, sharded_processing(metadata->isShardedProcessing())
, current_shard(current_shard_)
{
@ -237,7 +238,8 @@ Chunk StorageS3QueueSource::generate()
}
catch (...)
{
tryLogCurrentException(__PRETTY_FUNCTION__);
LOG_ERROR(log, "Failed to set file {} as failed: {}",
key_with_info->key, getCurrentExceptionMessage(true));
}
appendLogElement(reader.getFile(), *file_status, processed_rows_from_file, false);
@ -263,7 +265,8 @@ Chunk StorageS3QueueSource::generate()
}
catch (...)
{
tryLogCurrentException(__PRETTY_FUNCTION__);
LOG_ERROR(log, "Failed to set file {} as failed: {}",
key_with_info->key, getCurrentExceptionMessage(true));
}
appendLogElement(reader.getFile(), *file_status, processed_rows_from_file, false);

View File

@ -42,7 +42,8 @@ public:
std::shared_ptr<S3QueueFilesMetadata> metadata_,
std::unique_ptr<GlobIterator> glob_iterator_,
size_t current_shard_,
std::atomic<bool> & shutdown_called_);
std::atomic<bool> & shutdown_called_,
LoggerPtr logger_);
/// Note:
/// List results in s3 are always returned in UTF-8 binary order.
@ -56,7 +57,7 @@ public:
const std::unique_ptr<GlobIterator> glob_iterator;
std::atomic<bool> & shutdown_called;
std::mutex mutex;
Poco::Logger * log;
LoggerPtr log;
const bool sharded_processing;
const size_t current_shard;

View File

@ -116,7 +116,7 @@ StorageS3Queue::StorageS3Queue(
, configuration{configuration_}
, format_settings(format_settings_)
, reschedule_processing_interval_ms(s3queue_settings->s3queue_polling_min_timeout_ms)
, log(getLogger("StorageS3Queue (" + table_id_.table_name + ")"))
, log(getLogger("StorageS3Queue (" + table_id_.getFullTableName() + ")"))
{
if (configuration.url.key.empty())
{
@ -170,14 +170,7 @@ StorageS3Queue::StorageS3Queue(
LOG_INFO(log, "Using zookeeper path: {}", zk_path.string());
task = getContext()->getSchedulePool().createTask("S3QueueStreamingTask", [this] { threadFunc(); });
try
{
createOrCheckMetadata(storage_metadata);
}
catch (...)
{
throw;
}
createOrCheckMetadata(storage_metadata);
/// Get metadata manager from S3QueueMetadataFactory,
/// it will increase the ref count for the metadata object.
@ -469,7 +462,7 @@ void StorageS3Queue::threadFunc()
}
catch (...)
{
tryLogCurrentException(__PRETTY_FUNCTION__);
LOG_ERROR(log, "Failed to process data: {}", getCurrentExceptionMessage(true));
}
if (!shutdown_called)
@ -562,17 +555,21 @@ void StorageS3Queue::createOrCheckMetadata(const StorageInMemoryMetadata & stora
requests.emplace_back(zkutil::makeCreateRequest(zk_path / "metadata", metadata, zkutil::CreateMode::Persistent));
}
Coordination::Responses responses;
auto code = zookeeper->tryMulti(requests, responses);
if (code == Coordination::Error::ZNODEEXISTS)
if (!requests.empty())
{
LOG_INFO(log, "It looks like the table {} was created by another server at the same moment, will retry", zk_path.string());
continue;
}
else if (code != Coordination::Error::ZOK)
{
zkutil::KeeperMultiException::check(code, requests, responses);
Coordination::Responses responses;
auto code = zookeeper->tryMulti(requests, responses);
if (code == Coordination::Error::ZNODEEXISTS)
{
LOG_INFO(log, "It looks like the table {} was created by another server at the same moment, will retry", zk_path.string());
continue;
}
else if (code != Coordination::Error::ZOK)
{
zkutil::KeeperMultiException::check(code, requests, responses);
}
}
return;
}
@ -614,7 +611,8 @@ std::shared_ptr<StorageS3Queue::FileIterator> StorageS3Queue::createFileIterator
*configuration.client, configuration.url, predicate, getVirtualsList(), local_context,
/* read_keys */nullptr, configuration.request_settings);
return std::make_shared<FileIterator>(files_metadata, std::move(glob_iterator), s3queue_settings->s3queue_current_shard_num, shutdown_called);
return std::make_shared<FileIterator>(
files_metadata, std::move(glob_iterator), s3queue_settings->s3queue_current_shard_num, shutdown_called, log);
}
void registerStorageS3Queue(StorageFactory & factory)

View File

@ -1806,7 +1806,7 @@ def _upload_build_profile_data(
address,
size,
type,
symbol,
symbol
)
SELECT {pr_info.number}, '{pr_info.sha}', '{job_report.start_time}', '{build_name}', '{instance_type}', '{instance_id}',
file, reinterpretAsUInt64(reverse(unhex(address))), reinterpretAsUInt64(reverse(unhex(size))), type, symbol

View File

@ -4294,6 +4294,9 @@ class ClickHouseInstance:
)
return xml_str
def get_machine_name(self):
return platform.machine()
@property
def odbc_drivers(self):
if self.odbc_ini_path:
@ -4301,12 +4304,12 @@ class ClickHouseInstance:
"SQLite3": {
"DSN": "sqlite3_odbc",
"Database": "/tmp/sqliteodbc",
"Driver": "/usr/lib/x86_64-linux-gnu/odbc/libsqlite3odbc.so",
"Setup": "/usr/lib/x86_64-linux-gnu/odbc/libsqlite3odbc.so",
"Driver": f"/usr/lib/{self.get_machine_name()}-linux-gnu/odbc/libsqlite3odbc.so",
"Setup": f"/usr/lib/{self.get_machine_name()}-linux-gnu/odbc/libsqlite3odbc.so",
},
"MySQL": {
"DSN": "mysql_odbc",
"Driver": "/usr/lib/x86_64-linux-gnu/odbc/libmyodbc.so",
"Driver": f"/usr/lib/{self.get_machine_name()}-linux-gnu/odbc/libmyodbc.so",
"Database": odbc_mysql_db,
"Uid": odbc_mysql_uid,
"Pwd": odbc_mysql_pass,
@ -4323,8 +4326,8 @@ class ClickHouseInstance:
"ReadOnly": "No",
"RowVersioning": "No",
"ShowSystemTables": "No",
"Driver": "/usr/lib/x86_64-linux-gnu/odbc/psqlodbca.so",
"Setup": "/usr/lib/x86_64-linux-gnu/odbc/libodbcpsqlS.so",
"Driver": f"/usr/lib/{self.get_machine_name()}-linux-gnu/odbc/psqlodbca.so",
"Setup": f"/usr/lib/{self.get_machine_name()}-linux-gnu/odbc/libodbcpsqlS.so",
"ConnSettings": "",
},
}

View File

@ -1,13 +1,14 @@
import pytest
from helpers.cluster import ClickHouseCluster, CLICKHOUSE_CI_MIN_TESTED_VERSION
from helpers.cluster import ClickHouseCluster, CLICKHOUSE_CI_MIN_TESTED_VERSION, is_arm
# For arm version see https://github.com/ClickHouse/ClickHouse/pull/59132
cluster = ClickHouseCluster(__file__)
node1 = cluster.add_instance(
"node1",
with_zookeeper=False,
image="clickhouse/clickhouse-server",
tag=CLICKHOUSE_CI_MIN_TESTED_VERSION,
tag="24.1" if is_arm() else CLICKHOUSE_CI_MIN_TESTED_VERSION,
stay_alive=True,
with_installed_binary=True,
)
@ -15,7 +16,7 @@ node2 = cluster.add_instance(
"node2",
with_zookeeper=False,
image="clickhouse/clickhouse-server",
tag=CLICKHOUSE_CI_MIN_TESTED_VERSION,
tag="24.1" if is_arm() else CLICKHOUSE_CI_MIN_TESTED_VERSION,
stay_alive=True,
with_installed_binary=True,
)

View File

@ -23,13 +23,23 @@ def ch_cluster():
try:
cluster.start()
os.system(
"docker cp {local} {cont_id}:{dist}".format(
local=os.path.join(SCRIPT_DIR, "model/."),
cont_id=instance.docker_id,
dist="/etc/clickhouse-server/model",
instance.exec_in_container(["mkdir", f"/etc/clickhouse-server/model/"])
machine = instance.get_machine_name()
for source_name in os.listdir(os.path.join(SCRIPT_DIR, "model/.")):
dest_name = source_name
if machine in source_name:
machine_suffix = "_" + machine
dest_name = source_name[: -len(machine_suffix)]
os.system(
"docker cp {local} {cont_id}:{dist}".format(
local=os.path.join(SCRIPT_DIR, f"model/{source_name}"),
cont_id=instance.docker_id,
dist=f"/etc/clickhouse-server/model/{dest_name}",
)
)
)
instance.restart_clickhouse()
yield cluster

View File

@ -39,6 +39,10 @@ def wait_for_clickhouse_stop(started_node):
assert result == "OK", "ClickHouse process is still running"
@pytest.mark.skipif(
helpers.cluster.is_arm(),
reason="Fails on ARM, issue https://github.com/ClickHouse/ClickHouse/issues/63855",
)
def test_pkill(started_node):
if (
started_node.is_built_with_thread_sanitizer()
@ -59,6 +63,10 @@ def test_pkill(started_node):
)
@pytest.mark.skipif(
helpers.cluster.is_arm(),
reason="Fails on ARM, issue https://github.com/ClickHouse/ClickHouse/issues/63855",
)
def test_pkill_query_log(started_node):
for signal in ["SEGV", "4"]:
# force create query_log if it was not created

View File

@ -0,0 +1,17 @@
<clickhouse>
<storage_configuration>
<disks>
<disk_s3>
<type>s3</type>
<endpoint>http://minio1:9001/root/data/</endpoint>
<access_key_id>minio</access_key_id>
<secret_access_key>minio123</secret_access_key>
</disk_s3>
<disk_encrypted>
<type>encrypted</type>
<disk>disk_s3</disk>
<key>1234567812345678</key>
</disk_encrypted>
</disks>
</storage_configuration>
</clickhouse>

View File

@ -19,7 +19,9 @@ def cluster():
cluster = ClickHouseCluster(__file__)
cluster.add_instance(
"node",
main_configs=["configs/storage.xml"],
main_configs=["configs/storage_arm.xml"]
if is_arm()
else ["configs/storage_amd.xml"],
with_minio=True,
with_hdfs=not is_arm(),
)

View File

@ -1,3 +1,4 @@
<clickhouse>
<aggregate_function_group_array_max_element_size>10</aggregate_function_group_array_max_element_size>
<aggregate_function_group_array_has_limit_size>false</aggregate_function_group_array_has_limit_size>
</clickhouse>

View File

@ -9,6 +9,12 @@ node1 = cluster.add_instance(
stay_alive=True,
)
node2 = cluster.add_instance(
"node2",
main_configs=["configs/group_array_max_element_size.xml"],
stay_alive=True,
)
@pytest.fixture(scope="module")
def started_cluster():
@ -63,3 +69,33 @@ def test_max_exement_size(started_cluster):
node1.restart_clickhouse()
assert node1.query("select length(groupArrayMerge(x)) from tab3") == "21\n"
def test_limit_size(started_cluster):
node2.query(
"CREATE TABLE tab4 (x AggregateFunction(groupArray, Array(UInt8))) ENGINE = MergeTree ORDER BY tuple()"
)
node2.query("insert into tab4 select groupArrayState([zero]) from zeros(10)")
assert node2.query("select length(groupArrayMerge(x)) from tab4") == "10\n"
node2.replace_in_config(
"/etc/clickhouse-server/config.d/group_array_max_element_size.xml",
"false",
"true",
)
node2.restart_clickhouse()
node2.query("insert into tab4 select groupArrayState([zero]) from zeros(100)")
assert node2.query("select length(groupArrayMerge(x)) from tab4") == "10\n"
node2.replace_in_config(
"/etc/clickhouse-server/config.d/group_array_max_element_size.xml",
"true",
"false",
)
node2.restart_clickhouse()
with pytest.raises(Exception, match=r"Too large array size"):
node2.query("insert into tab4 select groupArrayState([zero]) from zeros(11)")

View File

@ -2,7 +2,7 @@ import random
import string
import pytest
from helpers.cluster import ClickHouseCluster
from helpers.cluster import ClickHouseCluster, is_arm
cluster = ClickHouseCluster(__file__)
@ -255,6 +255,11 @@ def test_uncompressed_cache_plus_zstd_codec(start_cluster):
def test_preconfigured_deflateqpl_codec(start_cluster):
if is_arm():
pytest.skip(
"Skipping test because it's special test for Intel code (doesn't work on ARM)"
)
node6.query(
"""
CREATE TABLE compression_codec_multiple_with_key (

View File

@ -79,6 +79,29 @@ def started_cluster():
cluster.shutdown()
def test_flatten_nested(started_cluster):
main_node.query(
"CREATE DATABASE create_replicated_table ENGINE = Replicated('/test/create_replicated_table', 'shard1', 'replica' || '1');"
)
dummy_node.query(
"CREATE DATABASE create_replicated_table ENGINE = Replicated('/test/create_replicated_table', 'shard1', 'replica2');"
)
main_node.query(
"CREATE TABLE create_replicated_table.replicated_table (d Date, k UInt64, i32 Int32) ENGINE=ReplicatedMergeTree ORDER BY k PARTITION BY toYYYYMM(d);"
)
main_node.query(
"CREATE MATERIALIZED VIEW create_replicated_table.mv ENGINE=ReplicatedMergeTree ORDER BY tuple() AS select d, cast([(k, toString(i32))] as Nested(a UInt64, b String)) from create_replicated_table.replicated_table"
)
assert main_node.query(
"show create create_replicated_table.mv"
) == dummy_node.query("show create create_replicated_table.mv")
main_node.query("DROP DATABASE create_replicated_table SYNC")
dummy_node.query("DROP DATABASE create_replicated_table SYNC")
def test_create_replicated_table(started_cluster):
main_node.query(
"CREATE DATABASE create_replicated_table ENGINE = Replicated('/test/create_replicated_table', 'shard1', 'replica' || '1');"

View File

@ -4,6 +4,7 @@
<disk_s3_plain_rewritable>
<type>s3_plain_rewritable</type>
<endpoint>http://minio1:9001/root/data/</endpoint>
<endpoint_subpath from_env="ENDPOINT_SUBPATH"></endpoint_subpath>
<access_key_id>minio</access_key_id>
<secret_access_key>minio123</secret_access_key>
</disk_s3_plain_rewritable>

View File

@ -1,24 +1,39 @@
import pytest
import random
import string
import threading
from helpers.cluster import ClickHouseCluster
cluster = ClickHouseCluster(__file__)
node = cluster.add_instance(
"node",
main_configs=["configs/storage_conf.xml"],
with_minio=True,
stay_alive=True,
)
insert_values = [
"(0,'data'),(1,'data')",
",".join(
NUM_WORKERS = 5
nodes = []
for i in range(NUM_WORKERS):
name = "node{}".format(i + 1)
node = cluster.add_instance(
name,
main_configs=["configs/storage_conf.xml"],
env_variables={"ENDPOINT_SUBPATH": name},
with_minio=True,
stay_alive=True,
)
nodes.append(node)
MAX_ROWS = 1000
def gen_insert_values(size):
return ",".join(
f"({i},'{''.join(random.choices(string.ascii_lowercase, k=5))}')"
for i in range(10)
),
]
for i in range(size)
)
insert_values = ",".join(
f"({i},'{''.join(random.choices(string.ascii_lowercase, k=5))}')" for i in range(10)
)
@pytest.fixture(scope="module", autouse=True)
@ -32,47 +47,71 @@ def start_cluster():
@pytest.mark.order(0)
def test_insert():
for index, value in enumerate(insert_values):
def create_insert(node, insert_values):
node.query(
"""
CREATE TABLE test_{} (
CREATE TABLE test (
id Int64,
data String
) ENGINE=MergeTree()
ORDER BY id
SETTINGS storage_policy='s3_plain_rewritable'
""".format(
index
)
"""
)
node.query("INSERT INTO test VALUES {}".format(insert_values))
node.query("INSERT INTO test_{} VALUES {}".format(index, value))
insert_values_arr = [
gen_insert_values(random.randint(1, MAX_ROWS)) for _ in range(0, NUM_WORKERS)
]
threads = []
for i in range(NUM_WORKERS):
t = threading.Thread(
target=create_insert, args=(nodes[i], insert_values_arr[i])
)
threads.append(t)
t.start()
for t in threads:
t.join()
for i in range(NUM_WORKERS):
assert (
node.query("SELECT * FROM test_{} ORDER BY id FORMAT Values".format(index))
== value
nodes[i].query("SELECT * FROM test ORDER BY id FORMAT Values")
== insert_values_arr[i]
)
@pytest.mark.order(1)
def test_restart():
for index, value in enumerate(insert_values):
assert (
node.query("SELECT * FROM test_{} ORDER BY id FORMAT Values".format(index))
== value
insert_values_arr = []
for i in range(NUM_WORKERS):
insert_values_arr.append(
nodes[i].query("SELECT * FROM test ORDER BY id FORMAT Values")
)
node.restart_clickhouse()
for index, value in enumerate(insert_values):
def restart(node):
node.restart_clickhouse()
threads = []
for i in range(NUM_WORKERS):
t = threading.Thread(target=restart, args=(nodes[i],))
threads.append(t)
t.start()
for t in threads:
t.join()
for i in range(NUM_WORKERS):
assert (
node.query("SELECT * FROM test_{} ORDER BY id FORMAT Values".format(index))
== value
nodes[i].query("SELECT * FROM test ORDER BY id FORMAT Values")
== insert_values_arr[i]
)
@pytest.mark.order(2)
def test_drop():
for index, value in enumerate(insert_values):
node.query("DROP TABLE IF EXISTS test_{} SYNC".format(index))
for i in range(NUM_WORKERS):
nodes[i].query("DROP TABLE IF EXISTS test SYNC")
it = cluster.minio_client.list_objects(
cluster.minio_bucket, "data/", recursive=True

View File

@ -35,6 +35,10 @@ def started_node():
pass
@pytest.mark.skipif(
helpers.cluster.is_arm(),
reason="Fails on ARM, issue https://github.com/ClickHouse/ClickHouse/issues/63855",
)
def test_send_segfault(started_node):
# NOTE: another option is to increase waiting time.
if (

View File

@ -783,6 +783,7 @@ def test_max_set_age(started_cluster):
"s3queue_tracked_file_ttl_sec": max_age,
"s3queue_cleanup_interval_min_ms": 0,
"s3queue_cleanup_interval_max_ms": 0,
"s3queue_loading_retries": 0,
},
)
create_mv(node, table_name, dst_table_name)
@ -829,6 +830,61 @@ def test_max_set_age(started_cluster):
for path_count in paths_count:
assert 2 == path_count
failed_count = int(
node.query(
"SELECT value FROM system.events WHERE name = 'S3QueueFailedFiles' SETTINGS system_events_show_zero_values=1"
)
)
values = [
["failed", 1, 1],
]
values_csv = (
"\n".join((",".join(map(str, row)) for row in values)) + "\n"
).encode()
put_s3_file_content(started_cluster, f"{files_path}/fff.csv", values_csv)
for _ in range(30):
if failed_count + 1 == int(
node.query(
"SELECT value FROM system.events WHERE name = 'S3QueueFailedFiles' SETTINGS system_events_show_zero_values=1"
)
):
break
time.sleep(1)
assert failed_count + 1 == int(
node.query(
"SELECT value FROM system.events WHERE name = 'S3QueueFailedFiles' SETTINGS system_events_show_zero_values=1"
)
)
node.query("SYSTEM FLUSH LOGS")
assert "Cannot parse input" in node.query(
"SELECT exception FROM system.s3queue WHERE file_name ilike '%fff.csv'"
)
assert 1 == int(
node.query(
"SELECT count() FROM system.s3queue_log WHERE file_name ilike '%fff.csv' AND notEmpty(exception)"
)
)
time.sleep(max_age + 1)
assert failed_count + 2 == int(
node.query("SELECT value FROM system.events WHERE name = 'S3QueueFailedFiles'")
)
node.query("SYSTEM FLUSH LOGS")
assert "Cannot parse input" in node.query(
"SELECT exception FROM system.s3queue WHERE file_name ilike '%fff.csv' ORDER BY processing_end_time DESC LIMIT 1"
)
assert 2 == int(
node.query(
"SELECT count() FROM system.s3queue_log WHERE file_name ilike '%fff.csv' AND notEmpty(exception)"
)
)
def test_max_set_size(started_cluster):
node = started_cluster.instances["instance"]
@ -902,9 +958,9 @@ def test_drop_table(started_cluster):
node.wait_for_log_line(f"Reading from file: test_drop_data")
node.query(f"DROP TABLE {table_name} SYNC")
assert node.contains_in_log(
f"StorageS3Queue ({table_name}): Table is being dropped"
f"StorageS3Queue (default.{table_name}): Table is being dropped"
) or node.contains_in_log(
f"StorageS3Queue ({table_name}): Shutdown was called, stopping sync"
f"StorageS3Queue (default.{table_name}): Shutdown was called, stopping sync"
)

View File

@ -2,3 +2,6 @@
{'a':1,'b':2}
{'a':1,'b':2}
{'a':1,'b':2}
{}
{}
{'':''}

View File

@ -1,2 +1,8 @@
SELECT map('a', 1, 'b', 2) FROM remote('127.0.0.{1,2}', system, one);
SELECT map('a', 1, 'b', 2) FROM remote('127.0.0.{1,2}');
SELECT map() from remote('127.0.0.{1,2}', system,one);
drop table if exists bug_repro_local;
CREATE TABLE bug_repro_local (`attributes` Map(LowCardinality(String), String)) ENGINE = Log as select map('','');
SELECT if(1, attributes, map()) from remote('127.0.0.{1,2}', currentDatabase(), bug_repro_local) limit 1;

View File

@ -9,7 +9,7 @@ USER_FILES_PATH=$($CLICKHOUSE_CLIENT --query "select _path,_file from file('none
cp "$CUR_DIR"/data_csv/10m_rows.csv.xz $USER_FILES_PATH/
${CLICKHOUSE_CLIENT} --query="SELECT * FROM file('10m_rows.csv.xz' , 'CSVWithNames') order by identifier, number, name, surname, birthday LIMIT 1 settings max_threads=1, max_memory_usage=1000000000"
${CLICKHOUSE_CLIENT} --query="SELECT * FROM file('10m_rows.csv.xz' , 'CSVWithNames') order by identifier, number, name, surname, birthday LIMIT 1 settings max_threads=1, max_memory_usage=100000000"
${CLICKHOUSE_CLIENT} --query="SELECT * FROM file('10m_rows.csv.xz' , 'CSVWithNames') order by identifier, number, name, surname, birthday LIMIT 1 settings input_format_parallel_parsing=1, max_threads=1, max_parsing_threads=16, min_chunk_bytes_for_parallel_parsing=10485760, max_memory_usage=1000000000"
${CLICKHOUSE_CLIENT} --query="SELECT * FROM file('10m_rows.csv.xz' , 'CSVWithNames') order by identifier, number, name, surname, birthday LIMIT 1 settings input_format_parallel_parsing=1, max_threads=1, max_parsing_threads=16, min_chunk_bytes_for_parallel_parsing=10485760, max_memory_usage=100000000"
rm $USER_FILES_PATH/10m_rows.csv.xz

View File

@ -0,0 +1,14 @@
-- https://github.com/ClickHouse/ClickHouse/issues/48049
SET allow_experimental_analyzer = 1;
CREATE TABLE test_table (`id` UInt64, `value` String) ENGINE = TinyLog() AS Select number, number::String from numbers(10);
WITH CAST(tuple(1), 'Tuple (value UInt64)') AS compound_value
SELECT id, test_table.* APPLY x -> compound_value.*
FROM test_table
WHERE arrayMap(x -> toString(x) AS lambda, [NULL, 256, 257, NULL, NULL])
SETTINGS convert_query_to_cnf = true, optimize_using_constraints = true, optimize_substitute_columns = true; -- { serverError ILLEGAL_TYPE_OF_COLUMN_FOR_FILTER }
DESCRIBE TABLE (SELECT test_table.COLUMNS(id) FROM test_table WHERE '2147483647'); -- { serverError ILLEGAL_TYPE_OF_COLUMN_FOR_FILTER }
DROP TABLE test_table;

View File

@ -0,0 +1 @@
9900 49990050 49990050 49990050

View File

@ -0,0 +1,8 @@
SET join_algorithm = 'partial_merge';
SET max_joined_block_size_rows = 100;
SELECT count(ignore(*)), sum(t1.a), sum(t1.b), sum(t2.a)
FROM ( SELECT number AS a, number AS b FROM numbers(10000) ) t1
JOIN ( SELECT number + 100 AS a FROM numbers(10000) ) t2
ON t1.a = t2.a AND t1.b > 0;

View File

@ -0,0 +1,7 @@
1
2
3
--------
1 2023-01-14 00:00:00
2 2023-01-14 00:00:00
3 2023-01-14 00:00:00

View File

@ -0,0 +1,21 @@
DROP TABLE IF EXISTS tab;
DROP TABLE IF EXISTS tab_v;
CREATE TABLE tab (id Int32, val Nullable(Float64), dt Nullable(DateTime64(6)), type Nullable(Int32)) ENGINE = MergeTree ORDER BY id;
insert into tab values (1,10,'2023-01-14 00:00:00',1),(2,20,'2023-01-14 00:00:00',1),(3,20,'2023-01-14 00:00:00',2),(4,40,'2023-01-14 00:00:00',3),(5,50,'2023-01-14 00:00:00',3);
CREATE VIEW tab_v AS SELECT
t1.type AS type,
sum(t1.val) AS sval,
toStartOfDay(t1.dt) AS sday,
anyLast(sval) OVER w AS lval
FROM tab AS t1
GROUP BY
type,
sday
WINDOW w AS (PARTITION BY type);
select distinct type from tab_v order by type;
select '--------';
select distinct type, sday from tab_v order by type, sday;

View File

@ -0,0 +1,4 @@
CREATE TEMPORARY TABLE test (a UInt8, b UInt8, c UInt8);
INSERT INTO test (a, b, c, ) VALUES (1, 2, 3);
INSERT INTO test (a, b, c) VALUES (4, 5, 6);
SELECT * FROM test ORDER BY a;

View File

@ -0,0 +1,2 @@
regular expression is not set
`Upyachka`

View File

@ -0,0 +1,9 @@
#!/usr/bin/env bash
# Tags: no-fasttest, no-parallel, no-ordinary-database, long
CUR_DIR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)
# shellcheck source=../shell_config.sh
. "$CUR_DIR"/../shell_config.sh
${CLICKHOUSE_LOCAL} --query "SELECT * FROM format(Regexp, 's String', 'Hello')" 2>&1 | grep -o -F 'regular expression is not set'
${CLICKHOUSE_LOCAL} --query "SELECT * FROM format(Regexp, 's String', 'Hello') SETTINGS format_regexp = 'Upyachka'" 2>&1 | grep -o -F '`Upyachka`'

View File

@ -0,0 +1,3 @@
1 2 3
4 5 6
7 8 9

View File

@ -0,0 +1,5 @@
CREATE TEMPORARY TABLE test (a UInt8, b UInt8, c UInt8);
INSERT INTO test (a, b, c) VALUES (1, 2, 3, );
INSERT INTO test (a, b, c) VALUES (4, 5, 6,);
INSERT INTO test (a, b, c) VALUES (7, 8, 9);
SELECT * FROM test ORDER BY a;

View File

@ -0,0 +1,11 @@
#!/usr/bin/env bash
# Tags: no-fasttest, no-parallel, no-ordinary-database, long
CUR_DIR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)
# shellcheck source=../shell_config.sh
. "$CUR_DIR"/../shell_config.sh
# In previous versions this command took longer than ten minutes. Now it takes less than a second in release mode:
python3 -c 'import sys; import struct; sys.stdout.buffer.write(b"".join(struct.pack("<Q", 36) + b"\x40" + f"{i:064}".encode("ascii") for i in range(1024 * 1024)))' |
${CLICKHOUSE_CURL} "${CLICKHOUSE_URL}&max_query_size=100000000&query=INSERT+INTO+FUNCTION+null('timestamp+UInt64,+label+String')+FORMAT+RowBinary" --data-binary @-

View File

@ -2398,6 +2398,7 @@ sharded
sharding
shortcircuit
shortkeys
showCertificate
shoutout
simdjson
simpleJSON