Merge remote-tracking branch 'upstream/master' into HEAD

This commit is contained in:
Anton Popov 2024-11-13 10:32:46 +00:00
commit 511045840b
69 changed files with 1320 additions and 277 deletions

View File

@ -145,6 +145,7 @@
#define TSA_TRY_ACQUIRE_SHARED(...) __attribute__((try_acquire_shared_capability(__VA_ARGS__))) /// function tries to acquire a shared capability and returns a boolean value indicating success or failure
#define TSA_RELEASE_SHARED(...) __attribute__((release_shared_capability(__VA_ARGS__))) /// function releases the given shared capability
#define TSA_SCOPED_LOCKABLE __attribute__((scoped_lockable)) /// object of a class has scoped lockable capability
#define TSA_RETURN_CAPABILITY(...) __attribute__((lock_returned(__VA_ARGS__))) /// to return capabilities in functions
/// Macros for suppressing TSA warnings for specific reads/writes (instead of suppressing it for the whole function)
/// They use a lambda function to apply function attribute to a single statement. This enable us to suppress warnings locally instead of

View File

@ -113,7 +113,9 @@ RUN clickhouse local -q 'SELECT 1' >/dev/null 2>&1 && exit 0 || : \
/var/lib/apt/lists/* \
/var/cache/debconf \
/tmp/* \
&& apt-get autoremove --purge -yq dirmngr gnupg2
&& apt-get autoremove --purge -yq dirmngr gnupg2 \
&& chmod ugo+Xrw -R /etc/clickhouse-server /etc/clickhouse-client
# The last chmod is here to make the next one is No-op in docker official library Dockerfile
# post install
# we need to allow "others" access to clickhouse folder, because docker container

View File

@ -4773,7 +4773,7 @@ Result:
## toUTCTimestamp
Convert DateTime/DateTime64 type value from other time zone to UTC timezone timestamp
Convert DateTime/DateTime64 type value from other time zone to UTC timezone timestamp. This function is mainly included for compatibility with Apache Spark and similar frameworks.
**Syntax**
@ -4799,14 +4799,14 @@ SELECT toUTCTimestamp(toDateTime('2023-03-16'), 'Asia/Shanghai');
Result:
``` text
┌─toUTCTimestamp(toDateTime('2023-03-16'),'Asia/Shanghai')┐
┌─toUTCTimestamp(toDateTime('2023-03-16'), 'Asia/Shanghai')┐
│ 2023-03-15 16:00:00 │
└─────────────────────────────────────────────────────────┘
```
## fromUTCTimestamp
Convert DateTime/DateTime64 type value from UTC timezone to other time zone timestamp
Convert DateTime/DateTime64 type value from UTC timezone to other time zone timestamp. This function is mainly included for compatibility with Apache Spark and similar frameworks.
**Syntax**
@ -4832,7 +4832,7 @@ SELECT fromUTCTimestamp(toDateTime64('2023-03-16 10:00:00', 3), 'Asia/Shanghai')
Result:
``` text
┌─fromUTCTimestamp(toDateTime64('2023-03-16 10:00:00',3),'Asia/Shanghai')─┐
┌─fromUTCTimestamp(toDateTime64('2023-03-16 10:00:00',3), 'Asia/Shanghai')─┐
│ 2023-03-16 18:00:00.000 │
└─────────────────────────────────────────────────────────────────────────┘
```

View File

@ -279,7 +279,7 @@ For columns with a new or updated `MATERIALIZED` value expression, all existing
For columns with a new or updated `DEFAULT` value expression, the behavior depends on the ClickHouse version:
- In ClickHouse < v24.2, all existing rows are rewritten.
- ClickHouse >= v24.2 distinguishes if a row value in a column with `DEFAULT` value expression was explicitly specified when it was inserted, or not, i.e. calculated from the `DEFAULT` value expression. If the value was explicitly specified, ClickHouse keeps it as is. If the value was was calculated, ClickHouse changes it to the new or updated `MATERIALIZED` value expression.
- ClickHouse >= v24.2 distinguishes if a row value in a column with `DEFAULT` value expression was explicitly specified when it was inserted, or not, i.e. calculated from the `DEFAULT` value expression. If the value was explicitly specified, ClickHouse keeps it as is. If the value was calculated, ClickHouse changes it to the new or updated `MATERIALIZED` value expression.
Syntax:

View File

@ -88,6 +88,7 @@ void FunctionNode::resolveAsFunction(FunctionBasePtr function_value)
function_name = function_value->getName();
function = std::move(function_value);
kind = FunctionKind::ORDINARY;
nulls_action = NullsAction::EMPTY;
}
void FunctionNode::resolveAsAggregateFunction(AggregateFunctionPtr aggregate_function_value)
@ -95,6 +96,12 @@ void FunctionNode::resolveAsAggregateFunction(AggregateFunctionPtr aggregate_fun
function_name = aggregate_function_value->getName();
function = std::move(aggregate_function_value);
kind = FunctionKind::AGGREGATE;
/** When the function is resolved, we do not need the nulls action anymore.
* The only thing that the nulls action does is map from one function to another.
* Thus, the nulls action is encoded in the function name and does not make sense anymore.
* Keeping the nulls action may lead to incorrect comparison of functions, e.g., count() and count() IGNORE NULLS are the same function.
*/
nulls_action = NullsAction::EMPTY;
}
void FunctionNode::resolveAsWindowFunction(AggregateFunctionPtr window_function_value)

View File

@ -48,9 +48,15 @@ ASTPtr JoinNode::toASTTableJoin() const
auto join_expression_ast = children[join_expression_child_index]->toAST();
if (is_using_join_expression)
join_ast->using_expression_list = std::move(join_expression_ast);
{
join_ast->using_expression_list = join_expression_ast;
join_ast->children.push_back(join_ast->using_expression_list);
}
else
join_ast->on_expression = std::move(join_expression_ast);
{
join_ast->on_expression = join_expression_ast;
join_ast->children.push_back(join_ast->on_expression);
}
}
return join_ast;

View File

@ -85,10 +85,9 @@ QueryTreeNodePtr createResolvedFunction(const ContextPtr & context, const String
}
FunctionNodePtr createResolvedAggregateFunction(
const String & name, const QueryTreeNodePtr & argument, const Array & parameters = {}, NullsAction action = NullsAction::EMPTY)
const String & name, const QueryTreeNodePtr & argument, const Array & parameters = {})
{
auto function_node = std::make_shared<FunctionNode>(name);
function_node->setNullsAction(action);
if (!parameters.empty())
{
@ -100,7 +99,7 @@ FunctionNodePtr createResolvedAggregateFunction(
function_node->getArguments().getNodes() = { argument };
AggregateFunctionProperties properties;
auto aggregate_function = AggregateFunctionFactory::instance().get(name, action, {argument->getResultType()}, parameters, properties);
auto aggregate_function = AggregateFunctionFactory::instance().get(name, NullsAction::EMPTY, {argument->getResultType()}, parameters, properties);
function_node->resolveAsAggregateFunction(std::move(aggregate_function));
return function_node;

View File

@ -3,7 +3,6 @@
#include <memory>
#include <Common/Exception.h>
#include "Analyzer/Passes/OptimizeGroupByInjectiveFunctionsPass.h"
#include <IO/WriteHelpers.h>
#include <IO/Operators.h>
@ -16,39 +15,39 @@
#include <Analyzer/ColumnNode.h>
#include <Analyzer/FunctionNode.h>
#include <Analyzer/InDepthQueryTreeVisitor.h>
#include <Analyzer/Utils.h>
#include <Analyzer/Passes/AggregateFunctionOfGroupByKeysPass.h>
#include <Analyzer/Passes/AggregateFunctionsArithmericOperationsPass.h>
#include <Analyzer/Passes/ArrayExistsToHasPass.h>
#include <Analyzer/Passes/AutoFinalOnQueryPass.h>
#include <Analyzer/Passes/ComparisonTupleEliminationPass.h>
#include <Analyzer/Passes/ConvertOrLikeChainPass.h>
#include <Analyzer/Passes/ConvertQueryToCNFPass.h>
#include <Analyzer/Passes/CountDistinctPass.h>
#include <Analyzer/Passes/CrossToInnerJoinPass.h>
#include <Analyzer/Passes/FunctionToSubcolumnsPass.h>
#include <Analyzer/Passes/FuseFunctionsPass.h>
#include <Analyzer/Passes/GroupingFunctionsResolvePass.h>
#include <Analyzer/Passes/IfChainToMultiIfPass.h>
#include <Analyzer/Passes/IfConstantConditionPass.h>
#include <Analyzer/Passes/IfTransformStringsToEnumPass.h>
#include <Analyzer/Passes/LogicalExpressionOptimizerPass.h>
#include <Analyzer/Passes/MultiIfToIfPass.h>
#include <Analyzer/Passes/NormalizeCountVariantsPass.h>
#include <Analyzer/Passes/OptimizeDateOrDateTimeConverterWithPreimagePass.h>
#include <Analyzer/Passes/OptimizeGroupByFunctionKeysPass.h>
#include <Analyzer/Passes/OptimizeGroupByInjectiveFunctionsPass.h>
#include <Analyzer/Passes/OptimizeRedundantFunctionsInOrderByPass.h>
#include <Analyzer/Passes/OrderByLimitByDuplicateEliminationPass.h>
#include <Analyzer/Passes/OrderByTupleEliminationPass.h>
#include <Analyzer/Passes/QueryAnalysisPass.h>
#include <Analyzer/Passes/RemoveUnusedProjectionColumnsPass.h>
#include <Analyzer/Passes/RewriteSumFunctionWithSumAndCountPass.h>
#include <Analyzer/Passes/CountDistinctPass.h>
#include <Analyzer/Passes/UniqToCountPass.h>
#include <Analyzer/Passes/FunctionToSubcolumnsPass.h>
#include <Analyzer/Passes/RewriteAggregateFunctionWithIfPass.h>
#include <Analyzer/Passes/SumIfToCountIfPass.h>
#include <Analyzer/Passes/MultiIfToIfPass.h>
#include <Analyzer/Passes/IfConstantConditionPass.h>
#include <Analyzer/Passes/IfChainToMultiIfPass.h>
#include <Analyzer/Passes/OrderByTupleEliminationPass.h>
#include <Analyzer/Passes/NormalizeCountVariantsPass.h>
#include <Analyzer/Passes/AggregateFunctionsArithmericOperationsPass.h>
#include <Analyzer/Passes/UniqInjectiveFunctionsEliminationPass.h>
#include <Analyzer/Passes/OrderByLimitByDuplicateEliminationPass.h>
#include <Analyzer/Passes/FuseFunctionsPass.h>
#include <Analyzer/Passes/OptimizeGroupByFunctionKeysPass.h>
#include <Analyzer/Passes/IfTransformStringsToEnumPass.h>
#include <Analyzer/Passes/ConvertOrLikeChainPass.h>
#include <Analyzer/Passes/OptimizeRedundantFunctionsInOrderByPass.h>
#include <Analyzer/Passes/GroupingFunctionsResolvePass.h>
#include <Analyzer/Passes/AutoFinalOnQueryPass.h>
#include <Analyzer/Passes/ArrayExistsToHasPass.h>
#include <Analyzer/Passes/ComparisonTupleEliminationPass.h>
#include <Analyzer/Passes/LogicalExpressionOptimizerPass.h>
#include <Analyzer/Passes/CrossToInnerJoinPass.h>
#include <Analyzer/Passes/RewriteSumFunctionWithSumAndCountPass.h>
#include <Analyzer/Passes/ShardNumColumnToFunctionPass.h>
#include <Analyzer/Passes/ConvertQueryToCNFPass.h>
#include <Analyzer/Passes/AggregateFunctionOfGroupByKeysPass.h>
#include <Analyzer/Passes/OptimizeDateOrDateTimeConverterWithPreimagePass.h>
#include <Analyzer/Passes/SumIfToCountIfPass.h>
#include <Analyzer/Passes/UniqInjectiveFunctionsEliminationPass.h>
#include <Analyzer/Passes/UniqToCountPass.h>
#include <Analyzer/Utils.h>
namespace DB
{

View File

@ -1,3 +1,4 @@
#include <Interpreters/ProcessorsProfileLog.h>
#include <Common/FieldVisitorToString.h>
#include <DataTypes/DataTypesNumber.h>
@ -676,6 +677,8 @@ void QueryAnalyzer::evaluateScalarSubqueryIfNeeded(QueryTreeNodePtr & node, Iden
"tuple"});
}
}
logProcessorProfile(context, io.pipeline.getProcessors());
}
scalars_cache.emplace(node_with_hash, scalar_block);

View File

@ -331,7 +331,7 @@ AsynchronousMetrics::~AsynchronousMetrics()
AsynchronousMetricValues AsynchronousMetrics::getValues() const
{
std::lock_guard lock(data_mutex);
SharedLockGuard lock(values_mutex);
return values;
}
@ -1807,7 +1807,10 @@ void AsynchronousMetrics::update(TimePoint update_time, bool force_update)
first_run = false;
// Finally, update the current metrics.
values = new_values;
{
std::lock_guard values_lock(values_mutex);
values.swap(new_values);
}
}
}

View File

@ -4,6 +4,7 @@
#include <Common/MemoryStatisticsOS.h>
#include <Common/ThreadPool.h>
#include <Common/Stopwatch.h>
#include <Common/SharedMutex.h>
#include <IO/ReadBufferFromFile.h>
#include <condition_variable>
@ -100,6 +101,7 @@ private:
std::condition_variable wait_cond;
bool quit TSA_GUARDED_BY(thread_mutex) = false;
/// Protects all raw data and serializes multiple updates.
mutable std::mutex data_mutex;
/// Some values are incremental and we have to calculate the difference.
@ -107,7 +109,15 @@ private:
bool first_run TSA_GUARDED_BY(data_mutex) = true;
TimePoint previous_update_time TSA_GUARDED_BY(data_mutex);
AsynchronousMetricValues values TSA_GUARDED_BY(data_mutex);
/// Protects saved values.
mutable SharedMutex values_mutex;
/// Values store the result of the last update prepared for reading.
#ifdef OS_LINUX
AsynchronousMetricValues values TSA_GUARDED_BY(values_mutex);
#else
/// When SharedMutex == std::shared_mutex it may not be annotated with the 'capability'.
AsynchronousMetricValues values;
#endif
#if defined(OS_LINUX) || defined(OS_FREEBSD)
MemoryStatisticsOS memory_stat TSA_GUARDED_BY(data_mutex);

View File

@ -1,23 +1,47 @@
#pragma once
#include <Common/OvercommitTracker.h>
#include <base/defines.h>
#include <Common/Exception.h>
#include <Common/OvercommitTracker.h>
namespace DB
{
namespace ErrorCodes
{
extern const int LOGICAL_ERROR;
};
/** LockGuard provides RAII-style locking mechanism for a mutex.
** It's intended to be used like std::unique_ptr but with TSA annotations
** It's intended to be used like std::unique_lock but with TSA annotations
*/
template <typename Mutex>
class TSA_SCOPED_LOCKABLE LockGuard
{
public:
explicit LockGuard(Mutex & mutex_) TSA_ACQUIRE(mutex_) : mutex(mutex_) { mutex.lock(); }
~LockGuard() TSA_RELEASE() { mutex.unlock(); }
explicit LockGuard(Mutex & mutex_) TSA_ACQUIRE(mutex_) : mutex(mutex_) { lock(); }
~LockGuard() TSA_RELEASE() { if (locked) unlock(); }
void lock() TSA_ACQUIRE()
{
/// Don't allow recursive_mutex for now.
if (locked)
throw Exception(ErrorCodes::LOGICAL_ERROR, "Can't lock twice the same mutex");
mutex.lock();
locked = true;
}
void unlock() TSA_RELEASE()
{
if (!locked)
throw Exception(ErrorCodes::LOGICAL_ERROR, "Can't unlock the mutex without locking it first");
mutex.unlock();
locked = false;
}
private:
Mutex & mutex;
bool locked = false;
};
template <template<typename> typename TLockGuard, typename Mutex>

View File

@ -4875,6 +4875,9 @@ Limit on size of a single batch of file segments that a read buffer can request
)", 0) \
DECLARE(UInt64, filesystem_cache_reserve_space_wait_lock_timeout_milliseconds, 1000, R"(
Wait time to lock cache for space reservation in filesystem cache
)", 0) \
DECLARE(Bool, filesystem_cache_prefer_bigger_buffer_size, true, R"(
Prefer bigger buffer size if filesystem cache is enabled to avoid writing small file segments which deteriorate cache performance. On the other hand, enabling this setting might increase memory usage.
)", 0) \
DECLARE(UInt64, temporary_data_in_cache_reserve_space_wait_lock_timeout_milliseconds, (10 * 60 * 1000), R"(
Wait time to lock cache for space reservation for temporary data in filesystem cache

View File

@ -77,6 +77,7 @@ static std::initializer_list<std::pair<ClickHouseVersion, SettingsChangesHistory
{"backup_restore_keeper_max_retries_while_handling_error", 0, 20, "New setting."},
{"backup_restore_finish_timeout_after_error_sec", 0, 180, "New setting."},
{"parallel_replicas_local_plan", false, true, "Use local plan for local replica in a query with parallel replicas"},
{"filesystem_cache_prefer_bigger_buffer_size", true, true, "New setting"},
{"read_in_order_use_virtual_row", false, false, "Use virtual row while reading in order of primary key or its monotonic function fashion. It is useful when searching over multiple parts as only relevant ones are touched."},
}
},

View File

@ -642,7 +642,10 @@ std::unique_ptr<ReadBufferFromFileBase> DiskObjectStorage::readFile(
};
/// Avoid cache fragmentation by choosing bigger buffer size.
bool prefer_bigger_buffer_size = object_storage->supportsCache() && read_settings.enable_filesystem_cache;
bool prefer_bigger_buffer_size = read_settings.filesystem_cache_prefer_bigger_buffer_size
&& object_storage->supportsCache()
&& read_settings.enable_filesystem_cache;
size_t buffer_size = prefer_bigger_buffer_size
? std::max<size_t>(settings.remote_fs_buffer_size, DBMS_DEFAULT_BUFFER_SIZE)
: settings.remote_fs_buffer_size;

View File

@ -4410,7 +4410,7 @@ private:
variant_column = IColumn::mutate(column);
/// Otherwise we should filter column.
else
variant_column = column->filter(filter, variant_size_hint)->assumeMutable();
variant_column = IColumn::mutate(column->filter(filter, variant_size_hint));
assert_cast<ColumnLowCardinality &>(*variant_column).nestedRemoveNullable();
return createVariantFromDescriptorsAndOneNonEmptyVariant(variant_types, std::move(discriminators), std::move(variant_column), variant_discr);

View File

@ -61,6 +61,7 @@ struct ReadSettings
bool filesystem_cache_allow_background_download = true;
bool filesystem_cache_allow_background_download_for_metadata_files_in_packed_storage = true;
bool filesystem_cache_allow_background_download_during_fetch = true;
bool filesystem_cache_prefer_bigger_buffer_size = true;
bool use_page_cache_for_disks_without_file_cache = false;
bool read_from_page_cache_if_exists_otherwise_bypass_cache = false;

View File

@ -196,6 +196,7 @@ namespace Setting
extern const SettingsUInt64 filesystem_cache_segments_batch_size;
extern const SettingsBool filesystem_cache_enable_background_download_for_metadata_files_in_packed_storage;
extern const SettingsBool filesystem_cache_enable_background_download_during_fetch;
extern const SettingsBool filesystem_cache_prefer_bigger_buffer_size;
extern const SettingsBool http_make_head_request;
extern const SettingsUInt64 http_max_fields;
extern const SettingsUInt64 http_max_field_name_size;
@ -5751,6 +5752,7 @@ ReadSettings Context::getReadSettings() const
res.filesystem_cache_allow_background_download_for_metadata_files_in_packed_storage
= settings_ref[Setting::filesystem_cache_enable_background_download_for_metadata_files_in_packed_storage];
res.filesystem_cache_allow_background_download_during_fetch = settings_ref[Setting::filesystem_cache_enable_background_download_during_fetch];
res.filesystem_cache_prefer_bigger_buffer_size = settings_ref[Setting::filesystem_cache_prefer_bigger_buffer_size];
res.filesystem_cache_max_download_size = settings_ref[Setting::filesystem_cache_max_download_size];
res.skip_download_if_exceeds_query_cache = settings_ref[Setting::skip_download_if_exceeds_query_cache];

View File

@ -5,9 +5,11 @@
#include <Core/Settings.h>
#include <DataTypes/DataTypeNullable.h>
#include <DataTypes/DataTypeTuple.h>
#include <IO/WriteBufferFromString.h>
#include <IO/WriteHelpers.h>
#include <Interpreters/Context.h>
#include <Interpreters/InterpreterSelectWithUnionQuery.h>
#include <Interpreters/ProcessorsProfileLog.h>
#include <Interpreters/addTypeConversionToAST.h>
#include <Interpreters/misc.h>
#include <Parsers/ASTExpressionList.h>
@ -19,9 +21,8 @@
#include <Parsers/ASTWithElement.h>
#include <Parsers/queryToString.h>
#include <Processors/Executors/PullingAsyncPipelineExecutor.h>
#include <Common/ProfileEvents.h>
#include <Common/FieldVisitorToString.h>
#include <IO/WriteBufferFromString.h>
#include <Common/ProfileEvents.h>
namespace ProfileEvents
{
@ -67,6 +68,18 @@ bool ExecuteScalarSubqueriesMatcher::needChildVisit(ASTPtr & node, const ASTPtr
return false;
}
if (auto * tables = node->as<ASTTablesInSelectQueryElement>())
{
/// Contrary to what's said in the code block above, ARRAY JOIN needs to resolve the subquery if possible
/// and assign an alias for 02367_optimize_trivial_count_with_array_join to pass. Otherwise it will fail in
/// ArrayJoinedColumnsVisitor (`No alias for non-trivial value in ARRAY JOIN: _a`)
/// This looks 100% as a incomplete code working on top of a bug, but this code has already been made obsolete
/// by the new analyzer, so it's an inconvenience we can live with until we deprecate it.
if (child == tables->array_join)
return true;
return false;
}
return true;
}
@ -246,6 +259,8 @@ void ExecuteScalarSubqueriesMatcher::visit(const ASTSubquery & subquery, ASTPtr
if (tmp_block.rows() != 0)
throw Exception(ErrorCodes::INCORRECT_RESULT_OF_SCALAR_SUBQUERY, "Scalar subquery returned more than one row");
logProcessorProfile(data.getContext(), io.pipeline.getProcessors());
}
block = materializeBlock(block);

View File

@ -1,21 +1,22 @@
#include <chrono>
#include <variant>
#include <Interpreters/PreparedSets.h>
#include <Interpreters/Set.h>
#include <Interpreters/InterpreterSelectWithUnionQuery.h>
#include <IO/Operators.h>
#include <Common/logger_useful.h>
#include <Core/Block.h>
#include <Core/Settings.h>
#include <Processors/QueryPlan/CreatingSetsStep.h>
#include <IO/Operators.h>
#include <Interpreters/InterpreterSelectWithUnionQuery.h>
#include <Interpreters/PreparedSets.h>
#include <Interpreters/ProcessorsProfileLog.h>
#include <Interpreters/Set.h>
#include <Processors/Executors/CompletedPipelineExecutor.h>
#include <Processors/QueryPlan/BuildQueryPipelineSettings.h>
#include <Processors/QueryPlan/CreatingSetsStep.h>
#include <Processors/QueryPlan/Optimizations/QueryPlanOptimizationSettings.h>
#include <Processors/QueryPlan/QueryPlan.h>
#include <Processors/Sinks/EmptySink.h>
#include <Processors/Sinks/NullSink.h>
#include <Processors/QueryPlan/QueryPlan.h>
#include <Core/Block.h>
#include <QueryPipeline/QueryPipelineBuilder.h>
#include <QueryPipeline/SizeLimits.h>
#include <Common/logger_useful.h>
namespace DB
{
@ -239,6 +240,8 @@ SetPtr FutureSetFromSubquery::buildOrderedSetInplace(const ContextPtr & context)
if (!set_and_key->set->isCreated())
return nullptr;
logProcessorProfile(context, pipeline.getProcessors());
return set_and_key->set;
}

View File

@ -1,5 +1,6 @@
#include <Interpreters/ProcessorsProfileLog.h>
#include <Core/Settings.h>
#include <DataTypes/DataTypeArray.h>
#include <DataTypes/DataTypeDate.h>
#include <DataTypes/DataTypeDateTime.h>
@ -8,16 +9,19 @@
#include <DataTypes/DataTypeNullable.h>
#include <DataTypes/DataTypeString.h>
#include <DataTypes/DataTypesNumber.h>
#include <Interpreters/Context.h>
#include <base/getFQDNOrHostName.h>
#include <Common/ClickHouseRevision.h>
#include <Common/DateLUTImpl.h>
#include <Common/logger_useful.h>
#include <array>
namespace DB
{
namespace Setting
{
extern const SettingsBool log_processors_profiles;
}
ColumnsDescription ProcessorProfileLogElement::getColumnsDescription()
{
return ColumnsDescription
@ -81,5 +85,57 @@ void ProcessorProfileLogElement::appendToBlock(MutableColumns & columns) const
columns[i++]->insert(output_bytes);
}
void logProcessorProfile(ContextPtr context, const Processors & processors)
{
const Settings & settings = context->getSettingsRef();
if (settings[Setting::log_processors_profiles])
{
if (auto processors_profile_log = context->getProcessorsProfileLog())
{
ProcessorProfileLogElement processor_elem;
const auto time_now = std::chrono::system_clock::now();
processor_elem.event_time = timeInSeconds(time_now);
processor_elem.event_time_microseconds = timeInMicroseconds(time_now);
processor_elem.initial_query_id = context->getInitialQueryId();
processor_elem.query_id = context->getCurrentQueryId();
auto get_proc_id = [](const IProcessor & proc) -> UInt64 { return reinterpret_cast<std::uintptr_t>(&proc); };
for (const auto & processor : processors)
{
std::vector<UInt64> parents;
for (const auto & port : processor->getOutputs())
{
if (!port.isConnected())
continue;
const IProcessor & next = port.getInputPort().getProcessor();
parents.push_back(get_proc_id(next));
}
processor_elem.id = get_proc_id(*processor);
processor_elem.parent_ids = std::move(parents);
processor_elem.plan_step = reinterpret_cast<std::uintptr_t>(processor->getQueryPlanStep());
processor_elem.plan_step_name = processor->getPlanStepName();
processor_elem.plan_step_description = processor->getPlanStepDescription();
processor_elem.plan_group = processor->getQueryPlanStepGroup();
processor_elem.processor_name = processor->getName();
processor_elem.elapsed_us = static_cast<UInt64>(processor->getElapsedNs() / 1000U);
processor_elem.input_wait_elapsed_us = static_cast<UInt64>(processor->getInputWaitElapsedNs() / 1000U);
processor_elem.output_wait_elapsed_us = static_cast<UInt64>(processor->getOutputWaitElapsedNs() / 1000U);
auto stats = processor->getProcessorDataStats();
processor_elem.input_rows = stats.input_rows;
processor_elem.input_bytes = stats.input_bytes;
processor_elem.output_rows = stats.output_rows;
processor_elem.output_bytes = stats.output_bytes;
processors_profile_log->add(processor_elem);
}
}
}
}
}

View File

@ -50,4 +50,5 @@ public:
using SystemLog<ProcessorProfileLogElement>::SystemLog;
};
void logProcessorProfile(ContextPtr context, const Processors & processors);
}

View File

@ -1,6 +1,7 @@
#include <base/getFQDNOrHostName.h>
#include <Common/DateLUT.h>
#include <Common/DateLUTImpl.h>
#include <Common/LockGuard.h>
#include <DataTypes/DataTypeDate.h>
#include <DataTypes/DataTypeDateTime.h>
#include <DataTypes/DataTypeDateTime64.h>
@ -16,7 +17,6 @@
#include <chrono>
#include <fmt/chrono.h>
#include <mutex>
namespace DB
@ -24,6 +24,15 @@ namespace DB
static auto logger = getLogger("QueryMetricLog");
String timePointToString(QueryMetricLog::TimePoint time)
{
/// fmtlib supports subsecond formatting in 10.0.0. We're in 9.1.0, so we need to add the milliseconds ourselves.
auto seconds = std::chrono::time_point_cast<std::chrono::seconds>(time);
auto microseconds = std::chrono::duration_cast<std::chrono::microseconds>(time - seconds).count();
return fmt::format("{:%Y.%m.%d %H:%M:%S}.{:06}", seconds, microseconds);
}
ColumnsDescription QueryMetricLogElement::getColumnsDescription()
{
ColumnsDescription result;
@ -87,36 +96,73 @@ void QueryMetricLog::shutdown()
Base::shutdown();
}
void QueryMetricLog::startQuery(const String & query_id, TimePoint start_time, UInt64 interval_milliseconds)
void QueryMetricLog::collectMetric(const ProcessList & process_list, String query_id)
{
QueryMetricLogStatus status;
status.interval_milliseconds = interval_milliseconds;
status.next_collect_time = start_time + std::chrono::milliseconds(interval_milliseconds);
auto current_time = std::chrono::system_clock::now();
const auto query_info = process_list.getQueryInfo(query_id, false, true, false);
if (!query_info)
{
/// TODO: remove trace before 24.11 release after checking everything is fine on the CI
LOG_TRACE(logger, "Query {} is not running anymore, so we couldn't get its QueryStatusInfo", query_id);
return;
}
LockGuard global_lock(queries_mutex);
auto it = queries.find(query_id);
/// The query might have finished while the scheduled task is running.
if (it == queries.end())
{
global_lock.unlock();
/// TODO: remove trace before 24.11 release after checking everything is fine on the CI
LOG_TRACE(logger, "Query {} not found in the list. Finished while this collecting task was running", query_id);
return;
}
auto & query_status = it->second;
if (!query_status.mutex)
{
global_lock.unlock();
/// TODO: remove trace before 24.11 release after checking everything is fine on the CI
LOG_TRACE(logger, "Query {} finished while this collecting task was running", query_id);
return;
}
LockGuard query_lock(query_status.getMutex());
global_lock.unlock();
auto elem = query_status.createLogMetricElement(query_id, *query_info, current_time);
if (elem)
add(std::move(elem.value()));
}
/// We use TSA_NO_THREAD_SAFETY_ANALYSIS to prevent TSA complaining that we're modifying the query_status fields
/// without locking the mutex. Since we're building it from scratch, there's no harm in not holding it.
/// If we locked it to make TSA happy, TSAN build would falsely complain about
/// lock-order-inversion (potential deadlock)
/// which is not a real issue since QueryMetricLogStatus's mutex cannot be locked by anything else
/// until we add it to the queries map.
void QueryMetricLog::startQuery(const String & query_id, TimePoint start_time, UInt64 interval_milliseconds) TSA_NO_THREAD_SAFETY_ANALYSIS
{
QueryMetricLogStatus query_status;
QueryMetricLogStatusInfo & info = query_status.info;
info.interval_milliseconds = interval_milliseconds;
info.next_collect_time = start_time;
auto context = getContext();
const auto & process_list = context->getProcessList();
status.task = context->getSchedulePool().createTask("QueryMetricLog", [this, &process_list, query_id] {
auto current_time = std::chrono::system_clock::now();
const auto query_info = process_list.getQueryInfo(query_id, false, true, false);
if (!query_info)
{
LOG_TRACE(logger, "Query {} is not running anymore, so we couldn't get its QueryStatusInfo", query_id);
return;
}
auto elem = createLogMetricElement(query_id, *query_info, current_time);
if (elem)
add(std::move(elem.value()));
info.task = context->getSchedulePool().createTask("QueryMetricLog", [this, &process_list, query_id] {
collectMetric(process_list, query_id);
});
std::lock_guard lock(queries_mutex);
status.task->scheduleAfter(interval_milliseconds);
queries.emplace(query_id, std::move(status));
LockGuard global_lock(queries_mutex);
query_status.scheduleNext(query_id);
queries.emplace(query_id, std::move(query_status));
}
void QueryMetricLog::finishQuery(const String & query_id, TimePoint finish_time, QueryStatusInfoPtr query_info)
{
std::unique_lock lock(queries_mutex);
LockGuard global_lock(queries_mutex);
auto it = queries.find(query_id);
/// finishQuery may be called from logExceptionBeforeStart when the query has not even started
@ -124,9 +170,19 @@ void QueryMetricLog::finishQuery(const String & query_id, TimePoint finish_time,
if (it == queries.end())
return;
auto & query_status = it->second;
decltype(query_status.mutex) query_mutex;
LockGuard query_lock(query_status.getMutex());
/// Move the query mutex here so that we hold it until the end, after removing the query from queries.
query_mutex = std::move(query_status.mutex);
query_status.mutex = {};
global_lock.unlock();
if (query_info)
{
auto elem = createLogMetricElement(query_id, *query_info, finish_time, false);
auto elem = query_status.createLogMetricElement(query_id, *query_info, finish_time, false);
if (elem)
add(std::move(elem.value()));
}
@ -139,51 +195,58 @@ void QueryMetricLog::finishQuery(const String & query_id, TimePoint finish_time,
/// that order.
{
/// Take ownership of the task so that we can destroy it in this scope after unlocking `queries_mutex`.
auto task = std::move(it->second.task);
auto task = std::move(query_status.info.task);
/// Build an empty task for the old task to make sure it does not lock any mutex on its destruction.
it->second.task = {};
query_status.info.task = {};
query_lock.unlock();
global_lock.lock();
queries.erase(query_id);
/// Ensure `queries_mutex` is unlocked before calling task's destructor at the end of this
/// scope which will lock `exec_mutex`.
lock.unlock();
global_lock.unlock();
}
}
std::optional<QueryMetricLogElement> QueryMetricLog::createLogMetricElement(const String & query_id, const QueryStatusInfo & query_info, TimePoint query_info_time, bool schedule_next)
void QueryMetricLogStatus::scheduleNext(String query_id)
{
/// fmtlib supports subsecond formatting in 10.0.0. We're in 9.1.0, so we need to add the milliseconds ourselves.
auto seconds = std::chrono::time_point_cast<std::chrono::seconds>(query_info_time);
auto microseconds = std::chrono::duration_cast<std::chrono::microseconds>(query_info_time - seconds).count();
LOG_DEBUG(logger, "Collecting query_metric_log for query {} with QueryStatusInfo from {:%Y.%m.%d %H:%M:%S}.{:06}. Schedule next: {}", query_id, seconds, microseconds, schedule_next);
std::unique_lock lock(queries_mutex);
auto query_status_it = queries.find(query_id);
/// The query might have finished while the scheduled task is running.
if (query_status_it == queries.end())
info.next_collect_time += std::chrono::milliseconds(info.interval_milliseconds);
const auto now = std::chrono::system_clock::now();
if (info.next_collect_time > now)
{
lock.unlock();
LOG_TRACE(logger, "Query {} finished already while this collecting task was running", query_id);
return {};
const auto wait_time = std::chrono::duration_cast<std::chrono::milliseconds>(info.next_collect_time - now).count();
info.task->scheduleAfter(wait_time);
}
auto & query_status = query_status_it->second;
if (query_info_time <= query_status.last_collect_time)
else
{
lock.unlock();
LOG_TRACE(logger, "The next collecting task for query {} should have already run at {}. Scheduling it right now",
query_id, timePointToString(info.next_collect_time));
info.task->schedule();
}
}
std::optional<QueryMetricLogElement> QueryMetricLogStatus::createLogMetricElement(const String & query_id, const QueryStatusInfo & query_info, TimePoint query_info_time, bool schedule_next)
{
/// TODO: remove trace before 24.11 release after checking everything is fine on the CI
LOG_TRACE(logger, "Collecting query_metric_log for query {} and interval {} ms with QueryStatusInfo from {}. Next collection time: {}",
query_id, info.interval_milliseconds, timePointToString(query_info_time),
schedule_next ? timePointToString(info.next_collect_time + std::chrono::milliseconds(info.interval_milliseconds)) : "finished");
if (query_info_time <= info.last_collect_time)
{
/// TODO: remove trace before 24.11 release after checking everything is fine on the CI
LOG_TRACE(logger, "Query {} has a more recent metrics collected. Skipping this one", query_id);
return {};
}
query_status.last_collect_time = query_info_time;
info.last_collect_time = query_info_time;
QueryMetricLogElement elem;
elem.event_time = timeInSeconds(query_info_time);
elem.event_time_microseconds = timeInMicroseconds(query_info_time);
elem.query_id = query_status_it->first;
elem.query_id = query_id;
elem.memory_usage = query_info.memory_usage > 0 ? query_info.memory_usage : 0;
elem.peak_memory_usage = query_info.peak_memory_usage > 0 ? query_info.peak_memory_usage : 0;
@ -192,7 +255,7 @@ std::optional<QueryMetricLogElement> QueryMetricLog::createLogMetricElement(cons
for (ProfileEvents::Event i = ProfileEvents::Event(0), end = ProfileEvents::end(); i < end; ++i)
{
const auto & new_value = (*(query_info.profile_counters))[i];
auto & old_value = query_status.last_profile_events[i];
auto & old_value = info.last_profile_events[i];
/// Profile event counters are supposed to be monotonic. However, at least the `NetworkReceiveBytes` can be inaccurate.
/// So, since in the future the counter should always have a bigger value than in the past, we skip this event.
@ -208,16 +271,13 @@ std::optional<QueryMetricLogElement> QueryMetricLog::createLogMetricElement(cons
}
else
{
LOG_TRACE(logger, "Query {} has no profile counters", query_id);
/// TODO: remove trace before 24.11 release after checking everything is fine on the CI
LOG_DEBUG(logger, "Query {} has no profile counters", query_id);
elem.profile_events = std::vector<ProfileEvents::Count>(ProfileEvents::end());
}
if (schedule_next)
{
query_status.next_collect_time += std::chrono::milliseconds(query_status.interval_milliseconds);
const auto wait_time = std::chrono::duration_cast<std::chrono::milliseconds>(query_status.next_collect_time - std::chrono::system_clock::now()).count();
query_status.task->scheduleAfter(wait_time);
}
scheduleNext(query_id);
return elem;
}

View File

@ -1,5 +1,6 @@
#pragma once
#include <base/defines.h>
#include <Common/ProfileEvents.h>
#include <Common/CurrentMetrics.h>
#include <Core/BackgroundSchedulePool.h>
@ -11,11 +12,17 @@
#include <chrono>
#include <ctime>
#include <mutex>
namespace DB
{
namespace ErrorCodes
{
extern const int LOGICAL_ERROR;
};
/** QueryMetricLogElement is a log of query metric values measured at regular time interval.
*/
@ -34,7 +41,7 @@ struct QueryMetricLogElement
void appendToBlock(MutableColumns & columns) const;
};
struct QueryMetricLogStatus
struct QueryMetricLogStatusInfo
{
UInt64 interval_milliseconds;
std::chrono::system_clock::time_point last_collect_time;
@ -43,24 +50,47 @@ struct QueryMetricLogStatus
BackgroundSchedulePool::TaskHolder task;
};
struct QueryMetricLogStatus
{
using TimePoint = std::chrono::system_clock::time_point;
using Mutex = std::mutex;
QueryMetricLogStatusInfo info TSA_GUARDED_BY(getMutex());
/// We need to be able to move it for the hash map, so we need to add an indirection here.
std::unique_ptr<Mutex> mutex = std::make_unique<Mutex>();
/// Return a reference to the mutex, used for Thread Sanitizer annotations.
Mutex & getMutex() const TSA_RETURN_CAPABILITY(mutex)
{
if (!mutex)
throw Exception(ErrorCodes::LOGICAL_ERROR, "Mutex cannot be NULL");
return *mutex;
}
void scheduleNext(String query_id) TSA_REQUIRES(getMutex());
std::optional<QueryMetricLogElement> createLogMetricElement(const String & query_id, const QueryStatusInfo & query_info, TimePoint query_info_time, bool schedule_next = true) TSA_REQUIRES(getMutex());
};
class QueryMetricLog : public SystemLog<QueryMetricLogElement>
{
using SystemLog<QueryMetricLogElement>::SystemLog;
using TimePoint = std::chrono::system_clock::time_point;
using Base = SystemLog<QueryMetricLogElement>;
public:
using TimePoint = std::chrono::system_clock::time_point;
void shutdown() final;
// Both startQuery and finishQuery are called from the thread that executes the query
/// Both startQuery and finishQuery are called from the thread that executes the query.
void startQuery(const String & query_id, TimePoint start_time, UInt64 interval_milliseconds);
void finishQuery(const String & query_id, TimePoint finish_time, QueryStatusInfoPtr query_info = nullptr);
private:
std::optional<QueryMetricLogElement> createLogMetricElement(const String & query_id, const QueryStatusInfo & query_info, TimePoint query_info_time, bool schedule_next = true);
void collectMetric(const ProcessList & process_list, String query_id);
std::recursive_mutex queries_mutex;
std::unordered_map<String, QueryMetricLogStatus> queries;
std::mutex queries_mutex;
std::unordered_map<String, QueryMetricLogStatus> queries TSA_GUARDED_BY(queries_mutex);
};
}

View File

@ -161,7 +161,13 @@ void QueryNormalizer::visit(ASTTablesInSelectQueryElement & node, const ASTPtr &
{
auto & join = node.table_join->as<ASTTableJoin &>();
if (join.on_expression)
{
ASTPtr original_on_expression = join.on_expression;
visit(join.on_expression, data);
if (join.on_expression != original_on_expression)
join.children = { join.on_expression };
}
}
}

View File

@ -6,6 +6,12 @@
namespace DB
{
namespace ErrorCode
{
extern const int LOGICAL_ERROR;
}
void RewriteArrayExistsFunctionMatcher::visit(ASTPtr & ast, Data & data)
{
if (auto * func = ast->as<ASTFunction>())
@ -20,21 +26,21 @@ void RewriteArrayExistsFunctionMatcher::visit(ASTPtr & ast, Data & data)
if (join->using_expression_list)
{
auto * it = std::find(join->children.begin(), join->children.end(), join->using_expression_list);
if (it == join->children.end())
throw Exception(ErrorCodes::LOGICAL_ERROR, "Could not find join->using_expression_list in '{}'", join->formatForLogging());
visit(join->using_expression_list, data);
if (it && *it != join->using_expression_list)
*it = join->using_expression_list;
*it = join->using_expression_list;
}
if (join->on_expression)
{
auto * it = std::find(join->children.begin(), join->children.end(), join->on_expression);
if (it == join->children.end())
throw Exception(ErrorCodes::LOGICAL_ERROR, "Could not find join->on_expression in '{}'", join->formatForLogging());
visit(join->on_expression, data);
if (it && *it != join->on_expression)
*it = join->on_expression;
*it = join->on_expression;
}
}
}

View File

@ -117,7 +117,6 @@ namespace Setting
extern const SettingsOverflowMode join_overflow_mode;
extern const SettingsString log_comment;
extern const SettingsBool log_formatted_queries;
extern const SettingsBool log_processors_profiles;
extern const SettingsBool log_profile_events;
extern const SettingsUInt64 log_queries_cut_to_length;
extern const SettingsBool log_queries;
@ -506,6 +505,7 @@ void logQueryFinish(
auto time_now = std::chrono::system_clock::now();
QueryStatusInfo info = process_list_elem->getInfo(true, settings[Setting::log_profile_events]);
logQueryMetricLogFinish(context, internal, elem.client_info.current_query_id, time_now, std::make_shared<QueryStatusInfo>(info));
elem.type = QueryLogElementType::QUERY_FINISH;
addStatusInfoToQueryLogElement(elem, info, query_ast, context);
@ -551,53 +551,8 @@ void logQueryFinish(
if (auto query_log = context->getQueryLog())
query_log->add(elem);
}
if (settings[Setting::log_processors_profiles])
{
if (auto processors_profile_log = context->getProcessorsProfileLog())
{
ProcessorProfileLogElement processor_elem;
processor_elem.event_time = elem.event_time;
processor_elem.event_time_microseconds = elem.event_time_microseconds;
processor_elem.initial_query_id = elem.client_info.initial_query_id;
processor_elem.query_id = elem.client_info.current_query_id;
auto get_proc_id = [](const IProcessor & proc) -> UInt64 { return reinterpret_cast<std::uintptr_t>(&proc); };
for (const auto & processor : query_pipeline.getProcessors())
{
std::vector<UInt64> parents;
for (const auto & port : processor->getOutputs())
{
if (!port.isConnected())
continue;
const IProcessor & next = port.getInputPort().getProcessor();
parents.push_back(get_proc_id(next));
}
processor_elem.id = get_proc_id(*processor);
processor_elem.parent_ids = std::move(parents);
processor_elem.plan_step = reinterpret_cast<std::uintptr_t>(processor->getQueryPlanStep());
processor_elem.plan_step_name = processor->getPlanStepName();
processor_elem.plan_step_description = processor->getPlanStepDescription();
processor_elem.plan_group = processor->getQueryPlanStepGroup();
processor_elem.processor_name = processor->getName();
processor_elem.elapsed_us = static_cast<UInt64>(processor->getElapsedNs() / 1000U);
processor_elem.input_wait_elapsed_us = static_cast<UInt64>(processor->getInputWaitElapsedNs() / 1000U);
processor_elem.output_wait_elapsed_us = static_cast<UInt64>(processor->getOutputWaitElapsedNs() / 1000U);
auto stats = processor->getProcessorDataStats();
processor_elem.input_rows = stats.input_rows;
processor_elem.input_bytes = stats.input_bytes;
processor_elem.output_rows = stats.output_rows;
processor_elem.output_bytes = stats.output_bytes;
processors_profile_log->add(processor_elem);
}
}
}
logProcessorProfile(context, query_pipeline.getProcessors());
logQueryMetricLogFinish(context, internal, elem.client_info.current_query_id, time_now, std::make_shared<QueryStatusInfo>(info));
}
@ -669,6 +624,7 @@ void logQueryException(
{
elem.query_duration_ms = start_watch.elapsedMilliseconds();
}
logQueryMetricLogFinish(context, internal, elem.client_info.current_query_id, time_now, info);
elem.query_cache_usage = QueryCache::Usage::None;
@ -698,8 +654,6 @@ void logQueryException(
query_span->addAttribute("clickhouse.exception_code", elem.exception_code);
query_span->finish();
}
logQueryMetricLogFinish(context, internal, elem.client_info.current_query_id, time_now, info);
}
void logExceptionBeforeStart(
@ -753,6 +707,8 @@ void logExceptionBeforeStart(
elem.client_info = context->getClientInfo();
logQueryMetricLogFinish(context, false, elem.client_info.current_query_id, std::chrono::system_clock::now(), nullptr);
elem.log_comment = settings[Setting::log_comment];
if (elem.log_comment.size() > settings[Setting::max_query_size])
elem.log_comment.resize(settings[Setting::max_query_size]);
@ -797,8 +753,6 @@ void logExceptionBeforeStart(
ProfileEvents::increment(ProfileEvents::FailedInsertQuery);
}
}
logQueryMetricLogFinish(context, false, elem.client_info.current_query_id, std::chrono::system_clock::now(), nullptr);
}
void validateAnalyzerSettings(ASTPtr ast, bool context_value)

View File

@ -1534,15 +1534,23 @@ static ColumnWithTypeAndName readColumnWithDateData(
for (size_t i = 0; i < orc_int_column->numElements; ++i)
{
Int32 days_num = static_cast<Int32>(orc_int_column->data[i]);
if (check_date_range && (days_num > DATE_LUT_MAX_EXTEND_DAY_NUM || days_num < -DAYNUM_OFFSET_EPOCH))
throw Exception(
ErrorCodes::VALUE_IS_OUT_OF_RANGE_OF_DATA_TYPE,
"Input value {} of a column \"{}\" exceeds the range of type Date32",
days_num,
column_name);
if (!orc_int_column->hasNulls || orc_int_column->notNull[i])
{
Int32 days_num = static_cast<Int32>(orc_int_column->data[i]);
if (check_date_range && (days_num > DATE_LUT_MAX_EXTEND_DAY_NUM || days_num < -DAYNUM_OFFSET_EPOCH))
throw Exception(
ErrorCodes::VALUE_IS_OUT_OF_RANGE_OF_DATA_TYPE,
"Input value {} of a column \"{}\" exceeds the range of type Date32",
days_num,
column_name);
column_data.push_back(days_num);
column_data.push_back(days_num);
}
else
{
/// ORC library doesn't guarantee that orc_int_column->data[i] is initialized to zero when orc_int_column->notNull[i] is false since https://github.com/ClickHouse/ClickHouse/pull/69473
column_data.push_back(0);
}
}
return {std::move(internal_column), internal_type, column_name};

View File

@ -10,8 +10,8 @@ namespace DB
/**
* Base cluster for Storages used in table functions like s3Cluster and hdfsCluster
* Needed for code simplification around parallel_distributed_insert_select
* Base cluster for Storages used in table functions like s3Cluster and hdfsCluster.
* Necessary for code simplification around parallel_distributed_insert_select.
*/
class IStorageCluster : public IStorage
{

View File

@ -29,8 +29,8 @@ namespace MergeTreeSetting
namespace ErrorCodes
{
extern const int REPLICA_IS_ALREADY_ACTIVE;
extern const int REPLICA_STATUS_CHANGED;
extern const int LOGICAL_ERROR;
extern const int SUPPORT_IS_DISABLED;
}
namespace FailPoints
@ -217,26 +217,10 @@ bool ReplicatedMergeTreeRestartingThread::tryStartup()
}
else
{
/// Table was created before 20.4 and was never altered,
/// let's initialize replica metadata version from global metadata version.
const String & zookeeper_path = storage.zookeeper_path, & replica_path = storage.replica_path;
Coordination::Stat table_metadata_version_stat;
zookeeper->get(zookeeper_path + "/metadata", &table_metadata_version_stat);
Coordination::Requests ops;
ops.emplace_back(zkutil::makeCheckRequest(zookeeper_path + "/metadata", table_metadata_version_stat.version));
ops.emplace_back(zkutil::makeCreateRequest(replica_path + "/metadata_version", toString(table_metadata_version_stat.version), zkutil::CreateMode::Persistent));
Coordination::Responses res;
auto code = zookeeper->tryMulti(ops, res);
if (code == Coordination::Error::ZBADVERSION)
throw Exception(ErrorCodes::REPLICA_STATUS_CHANGED, "Failed to initialize metadata_version "
"because table was concurrently altered, will retry");
zkutil::KeeperMultiException::check(code, ops, res);
throw Exception(ErrorCodes::SUPPORT_IS_DISABLED,
"It seems you have upgraded from a version earlier than 20.4 straight to one later than 24.10. "
"ClickHouse does not support upgrades that span more than a year. "
"Please update gradually (through intermediate versions).");
}
storage.queue.removeCurrentPartsFromMutations();

View File

@ -619,8 +619,17 @@ bool ReplicatedMergeTreeSinkImpl<false>::writeExistingPart(MergeTreeData::Mutabl
String block_id = deduplicate ? fmt::format("{}_{}", part->info.partition_id, part->checksums.getTotalChecksumHex()) : "";
bool deduplicated = commitPart(zookeeper, part, block_id, replicas_num).second;
int error = 0;
/// Set a special error code if the block is duplicate
int error = (deduplicate && deduplicated) ? ErrorCodes::INSERT_WAS_DEDUPLICATED : 0;
/// And remove attaching_ prefix
if (deduplicate && deduplicated)
{
error = ErrorCodes::INSERT_WAS_DEDUPLICATED;
if (!endsWith(part->getDataPartStorage().getRelativePath(), "detached/attaching_" + part->name + "/"))
throw Exception(ErrorCodes::LOGICAL_ERROR, "Unexpected relative path for a deduplicated part: {}", part->getDataPartStorage().getRelativePath());
fs::path new_relative_path = fs::path("detached") / part->getNewName(part->info);
part->renameTo(new_relative_path, false);
}
PartLog::addNewPart(storage.getContext(), PartLog::PartLogEntry(part, watch.elapsed(), profile_events_scope.getSnapshot()), ExecutionStatus(error));
return deduplicated;
}
@ -880,8 +889,9 @@ std::pair<std::vector<String>, bool> ReplicatedMergeTreeSinkImpl<async_insert>::
}
}
/// Save the current temporary path in case we need to revert the change to retry (ZK connection loss)
/// Save the current temporary path and name in case we need to revert the change to retry (ZK connection loss) or in case part is deduplicated.
const String temporary_part_relative_path = part->getDataPartStorage().getPartDirectory();
const String initial_part_name = part->name;
/// Obtain incremental block number and lock it. The lock holds our intention to add the block to the filesystem.
/// We remove the lock just after renaming the part. In case of exception, block number will be marked as abandoned.
@ -1050,16 +1060,6 @@ std::pair<std::vector<String>, bool> ReplicatedMergeTreeSinkImpl<async_insert>::
zkutil::KeeperMultiException::check(multi_code, ops, responses);
}
transaction.rollback();
if (!Coordination::isUserError(multi_code))
throw Exception(
ErrorCodes::UNEXPECTED_ZOOKEEPER_ERROR,
"Unexpected ZooKeeper error while adding block {} with ID '{}': {}",
block_number,
toString(block_id),
multi_code);
auto failed_op_idx = zkutil::getFailedOpIndex(multi_code, responses);
String failed_op_path = ops[failed_op_idx]->getPath();
@ -1069,6 +1069,11 @@ std::pair<std::vector<String>, bool> ReplicatedMergeTreeSinkImpl<async_insert>::
LOG_INFO(log, "Block with ID {} already exists (it was just appeared) for part {}. Ignore it.",
toString(block_id), part->name);
transaction.rollbackPartsToTemporaryState();
part->is_temp = true;
part->setName(initial_part_name);
part->renameTo(temporary_part_relative_path, false);
if constexpr (async_insert)
{
retry_context.conflict_block_ids = std::vector<String>({failed_op_path});
@ -1080,6 +1085,16 @@ std::pair<std::vector<String>, bool> ReplicatedMergeTreeSinkImpl<async_insert>::
return CommitRetryContext::DUPLICATED_PART;
}
transaction.rollback();
if (!Coordination::isUserError(multi_code))
throw Exception(
ErrorCodes::UNEXPECTED_ZOOKEEPER_ERROR,
"Unexpected ZooKeeper error while adding block {} with ID '{}': {}",
block_number,
toString(block_id),
multi_code);
if (multi_code == Coordination::Error::ZNONODE && failed_op_idx == block_unlock_op_idx)
throw Exception(ErrorCodes::QUERY_WAS_CANCELLED,
"Insert query (for block {}) was canceled by concurrent ALTER PARTITION or TRUNCATE",

View File

@ -517,7 +517,7 @@ std::unique_ptr<ReadBufferFromFileBase> StorageObjectStorageSource::createReadBu
LOG_TRACE(log, "Downloading object of size {} with initial prefetch", object_size);
bool prefer_bigger_buffer_size = impl->isCached();
bool prefer_bigger_buffer_size = read_settings.filesystem_cache_prefer_bigger_buffer_size && impl->isCached();
size_t buffer_size = prefer_bigger_buffer_size
? std::max<size_t>(read_settings.remote_fs_buffer_size, DBMS_DEFAULT_BUFFER_SIZE)
: read_settings.remote_fs_buffer_size;

View File

@ -747,6 +747,7 @@ ObjectStorageQueueSettings StorageObjectStorageQueue::getSettings() const
/// so let's reconstruct.
ObjectStorageQueueSettings settings;
const auto & table_metadata = getTableMetadata();
settings[ObjectStorageQueueSetting::mode] = table_metadata.mode;
settings[ObjectStorageQueueSetting::after_processing] = table_metadata.after_processing;
settings[ObjectStorageQueueSetting::keeper_path] = zk_path;
settings[ObjectStorageQueueSetting::loading_retries] = table_metadata.loading_retries;
@ -764,6 +765,7 @@ ObjectStorageQueueSettings StorageObjectStorageQueue::getSettings() const
settings[ObjectStorageQueueSetting::max_processed_files_before_commit] = commit_settings.max_processed_files_before_commit;
settings[ObjectStorageQueueSetting::max_processed_rows_before_commit] = commit_settings.max_processed_rows_before_commit;
settings[ObjectStorageQueueSetting::max_processed_bytes_before_commit] = commit_settings.max_processed_bytes_before_commit;
settings[ObjectStorageQueueSetting::max_processing_time_sec_before_commit] = commit_settings.max_processing_time_sec_before_commit;
return settings;
}

View File

@ -245,6 +245,7 @@ namespace
table_join->strictness = JoinStrictness::Semi;
table_join->on_expression = makeASTFunction("equals", makeASTColumn(data_table_id, TimeSeriesColumnNames::ID), makeASTColumn(tags_table_id, TimeSeriesColumnNames::ID));
table_join->children.push_back(table_join->on_expression);
table->table_join = table_join;
auto table_exp = std::make_shared<ASTTableExpression>();

View File

@ -51,7 +51,7 @@ protected:
"corresponding table function",
getName());
/// Evaluate only first argument, everything else will be done Base class
/// Evaluate only first argument, everything else will be done by the Base class
args[0] = evaluateConstantExpressionOrIdentifierAsLiteral(args[0], context);
/// Cluster name is always the first

View File

@ -798,10 +798,6 @@ def _upload_build_profile_data(
logging.info("Unknown CI logs host, skip uploading build profile data")
return
if not pr_info.number == 0:
logging.info("Skipping uploading build profile data for PRs")
return
instance_type = get_instance_type()
instance_id = get_instance_id()
auth = {
@ -1268,6 +1264,7 @@ def main() -> int:
s3,
pr_info.number,
pr_info.sha,
pr_info.head_ref,
job_report.test_results,
job_report.additional_files,
job_report.check_name or _get_ext_check_name(args.job_name),
@ -1335,6 +1332,7 @@ def main() -> int:
s3,
pr_info.number,
pr_info.sha,
pr_info.head_ref,
job_report.test_results,
job_report.additional_files,
job_report.check_name or _get_ext_check_name(args.job_name),

View File

@ -9,6 +9,7 @@ from typing import Any, Dict, List, Optional
import requests
from env_helper import GITHUB_REPOSITORY
from get_robot_token import get_parameter_from_ssm
from pr_info import PRInfo
from report import TestResults
@ -211,17 +212,13 @@ def prepare_tests_results_for_clickhouse(
report_url: str,
check_name: str,
) -> List[dict]:
pull_request_url = "https://github.com/ClickHouse/ClickHouse/commits/master"
base_ref = "master"
head_ref = "master"
base_repo = pr_info.repo_full_name
head_repo = pr_info.repo_full_name
base_ref = pr_info.base_ref
base_repo = pr_info.base_name
head_ref = pr_info.head_ref
head_repo = pr_info.head_name
pull_request_url = f"https://github.com/{GITHUB_REPOSITORY}/commits/{head_ref}"
if pr_info.number != 0:
pull_request_url = pr_info.pr_html_url
base_ref = pr_info.base_ref
base_repo = pr_info.base_name
head_ref = pr_info.head_ref
head_repo = pr_info.head_name
common_properties = {
"pull_request_number": pr_info.number,

View File

@ -315,7 +315,13 @@ def create_ci_report(pr_info: PRInfo, statuses: CommitStatuses) -> str:
)
)
return upload_results(
S3Helper(), pr_info.number, pr_info.sha, test_results, [], CI.StatusNames.CI
S3Helper(),
pr_info.number,
pr_info.sha,
pr_info.head_ref,
test_results,
[],
CI.StatusNames.CI,
)

View File

@ -250,7 +250,9 @@ def main():
s3_helper = S3Helper()
pr_info = PRInfo()
url = upload_results(s3_helper, pr_info.number, pr_info.sha, test_results, [], NAME)
url = upload_results(
s3_helper, pr_info.number, pr_info.sha, pr_info.head_ref, test_results, [], NAME
)
print(f"::notice ::Report url: {url}")

View File

@ -183,7 +183,9 @@ def main():
pr_info = PRInfo()
s3_helper = S3Helper()
url = upload_results(s3_helper, pr_info.number, pr_info.sha, test_results, [], NAME)
url = upload_results(
s3_helper, pr_info.number, pr_info.sha, pr_info.head_ref, test_results, [], NAME
)
print(f"::notice ::Report url: {url}")

438
tests/ci/official_docker.py Normal file
View File

@ -0,0 +1,438 @@
#!/usr/bin/env python
"""Plan:
- Create a PR with regenerated Dockerfiles for each release branch
- Generate `library definition file` as described in
https://github.com/docker-library/official-images/blob/master/README.md#library-definition-files
- Create a PR with it to https://github.com/docker-library/official-images, the file
name will be `library/clickhouse`"""
import argparse
import logging
from dataclasses import dataclass
from os import getpid
from pathlib import Path
from pprint import pformat
from shlex import quote
from shutil import rmtree
from textwrap import fill
from typing import Dict, Iterable, List, Optional, Set
from env_helper import GITHUB_REPOSITORY, IS_CI
from git_helper import GIT_PREFIX, Git, git_runner
from version_helper import (
ClickHouseVersion,
get_supported_versions,
get_tagged_versions,
get_version_from_string,
)
UBUNTU_NAMES = {
"20.04": "focal",
"22.04": "jammy",
}
if not IS_CI:
GIT_PREFIX = "git"
DOCKER_LIBRARY_REPOSITORY = "ClickHouse/docker-library"
DOCKER_LIBRARY_NAME = {"server": "clickhouse"}
def parse_args() -> argparse.Namespace:
parser = argparse.ArgumentParser(
formatter_class=argparse.ArgumentDefaultsHelpFormatter,
description="The script to handle tasks for docker-library/official-images",
)
global_args = argparse.ArgumentParser(add_help=False)
global_args.add_argument(
"-v",
"--verbose",
action="count",
default=0,
help="set the script verbosity, could be used multiple",
)
global_args.add_argument(
"--directory",
type=Path,
default=Path("docker/official"),
help="a relative to the reporitory root directory",
)
global_args.add_argument(
"--image-type",
choices=["server", "keeper"],
default="server",
help="which image type to process",
)
global_args.add_argument(
"--commit",
action="store_true",
help="if set, the directory `docker/official` will be staged and committed "
"after generating",
)
dockerfile_glob = "Dockerfile.*"
global_args.add_argument(
"--dockerfile-glob",
default=dockerfile_glob,
help="a glob to collect Dockerfiles in the server of keeper directory",
)
subparsers = parser.add_subparsers(
title="commands", dest="command", required=True, help="the command to run"
)
parser_tree = subparsers.add_parser(
"generate-tree",
formatter_class=argparse.ArgumentDefaultsHelpFormatter,
help="generates directory `docker/official`",
parents=[global_args],
)
parser_tree.add_argument(
"--min-version",
type=get_version_from_string,
default=None,
help="if not set, only currently supported versions will be used",
)
parser_tree.add_argument(
"--docker-branch",
default="",
help="if set, the branch to get the content of `docker/server` directory. When "
"unset, the content of directories is taken from release tags",
)
parser_tree.add_argument(
"--build-images",
action="store_true",
help="if set, the image will be built for each Dockerfile '--dockerfile-glob' "
"in the result dirs",
)
parser_tree.add_argument("--clean", default=True, help=argparse.SUPPRESS)
parser_tree.add_argument(
"--no-clean",
dest="clean",
action="store_false",
default=argparse.SUPPRESS,
help="if set, the directory `docker/official` won't be cleaned "
"before generating",
)
parser_tree.add_argument(
"--fetch-tags",
action="store_true",
help="if set, the tags will be updated before run",
)
parser_ldf = subparsers.add_parser(
"generate-ldf",
formatter_class=argparse.ArgumentDefaultsHelpFormatter,
help="generate docker library definition file",
parents=[global_args],
)
parser_ldf.add_argument("--check-changed", default=True, help=argparse.SUPPRESS)
parser_ldf.add_argument(
"--no-check-changed",
dest="check_changed",
action="store_false",
default=argparse.SUPPRESS,
help="if set, the directory `docker/official` won't be checked to be "
"uncommitted",
)
args = parser.parse_args()
return args
def get_versions_greater(minimal: ClickHouseVersion) -> Set[ClickHouseVersion]:
"Get the latest patch version for each major.minor >= minimal"
supported = {} # type: Dict[str, ClickHouseVersion]
versions = get_tagged_versions()
for v in versions:
if v < minimal or not v.is_supported:
continue
txt = f"{v.major}.{v.minor}"
if txt in supported:
supported[txt] = max(v, supported[txt])
continue
supported[txt] = v
return set(supported.values())
def create_versions_dirs(
versions: Iterable[ClickHouseVersion], directory: Path
) -> Dict[ClickHouseVersion, Path]:
assert directory.is_dir() or not directory.exists()
dirs = {}
for v in versions:
version_dir = directory / str(v)
version_dir.mkdir(parents=True, exist_ok=True)
logging.debug("Directory %s for version %s is created", version_dir, v)
dirs[v] = version_dir
return dirs
def generate_docker_directories(
version_dirs: Dict[ClickHouseVersion, Path],
docker_branch: str,
dockerfile_glob: str,
build_images: bool = False,
image_type: str = "server",
) -> None:
arg_version = "ARG VERSION="
start_filter = "#docker-official-library:off"
stop_filter = "#docker-official-library:on"
for version, directory in version_dirs.items():
branch = docker_branch or version.describe
docker_source = f"{branch}:docker/{image_type}"
logging.debug(
"Unpack directory content from '%s' to %s",
docker_source,
directory,
)
# We ignore README* files
git_runner(
f"git archive {docker_source} | tar --exclude='README*' -x -C {directory}"
)
for df in directory.glob(dockerfile_glob):
original_content = df.read_text().splitlines()
content = []
filtering = False
for line in original_content:
# Change the ARG VERSION= to a proper version
if line.startswith(arg_version):
logging.debug(
"Found '%s' in line %s:%s, setting to version %s",
arg_version,
df.name,
original_content.index(line),
version,
)
content.append(f'{arg_version}"{version}"')
continue
# Filter out CI-related part from official docker
if line == start_filter:
filtering = True
continue
if line == stop_filter:
filtering = False
continue
if not filtering:
content.append(line)
df.write_text("\n".join(content) + "\n")
if build_images:
git_runner(
f"docker build --network=host -t '{DOCKER_LIBRARY_NAME[image_type]}:"
f"{version}{df.suffix}' -f '{df}' --progress plain '{directory}'"
)
def path_is_changed(path: Path) -> bool:
"checks if `path` has uncommitted changes"
logging.info("Checking if the path %s is changed", path)
path_dir = path.parent
return bool(git_runner(f"git -C {path_dir} status --porcelain -- '{path}'"))
def get_cmdline(width: int = 80) -> str:
"Returns the cmdline split by words with given maximum width"
cmdline = " ".join(
quote(arg)
for arg in Path(f"/proc/{getpid()}/cmdline")
.read_text(encoding="utf-8")
.split("\x00")[:-1]
)
if width <= 2:
return cmdline
return " \\\n".join(
fill(
cmdline,
break_long_words=False,
break_on_hyphens=False,
subsequent_indent=r" ",
width=width - 2,
).split("\n")
)
def generate_tree(args: argparse.Namespace) -> None:
if args.fetch_tags:
# Fetch all tags to not miss the latest versions
git_runner(f"{GIT_PREFIX} fetch --tags --no-recurse-submodules")
if args.min_version:
versions = get_versions_greater(args.min_version)
else:
versions = get_supported_versions()
logging.info(
"The versions to generate:\n %s",
"\n ".join(v.string for v in sorted(versions)),
)
directory = Path(git_runner.cwd) / args.directory / args.image_type
if args.clean:
try:
logging.info("Removing directory %s before generating", directory)
rmtree(directory)
except FileNotFoundError:
pass
version_dirs = create_versions_dirs(versions, directory)
generate_docker_directories(
version_dirs,
args.docker_branch,
args.dockerfile_glob,
args.build_images,
args.image_type,
)
if args.commit and path_is_changed(directory):
logging.info("Staging and committing content of %s", directory)
git_runner(f"git -C {directory} add {directory}")
commit_message = "\n".join(
(
"Re-/Generated tags for official docker library",
"",
"The changed versions:",
*(f" {v.string}" for v in sorted(versions)),
f"The old images were removed: {args.clean}",
"The directory is generated and committed as following:",
f"{get_cmdline()}",
)
)
git_runner(f"{GIT_PREFIX} -C {directory} commit -F -", input=commit_message)
@dataclass
class TagAttrs:
"""A mutable metadata to preserve between generating tags for different versions"""
# Only one latest can exist
latest: ClickHouseVersion
# Only one can be a major one (the most fresh per a year)
majors: Dict[int, ClickHouseVersion]
# Only one lts version can exist
lts: Optional[ClickHouseVersion]
def ldf_header(git: Git, directory: Path) -> List[str]:
"Generates the header for LDF"
script_path = Path(__file__).relative_to(git.root)
script_sha = git_runner(f"git log -1 --format=format:%H -- {script_path}")
repo = f"https://github.com/{GITHUB_REPOSITORY}"
dl_repo = f"https://github.com/{DOCKER_LIBRARY_REPOSITORY}"
fetch_commit = git_runner(
f"git -C {directory} log -1 --format=format:%H -- {directory}"
)
dl_branch = git_runner(f"git -C {directory} branch --show-current")
return [
f"# The file is generated by {repo}/blob/{script_sha}/{script_path}",
"",
"Maintainers: Misha f. Shiryaev <felixoid@clickhouse.com> (@Felixoid),",
" Max Kainov <max.kainov@clickhouse.com> (@mkaynov),",
" Alexander Sapin <alesapin@clickhouse.com> (@alesapin)",
f"GitRepo: {dl_repo}.git",
f"GitFetch: refs/heads/{dl_branch}",
f"GitCommit: {fetch_commit}",
]
def ldf_tags(version: ClickHouseVersion, distro: str, tag_attrs: TagAttrs) -> str:
"""returns the string 'Tags: coma, separated, tags'"""
tags = []
# without_distro shows that it's the default tags set, without `-jammy` suffix
without_distro = distro in UBUNTU_NAMES.values()
if version == tag_attrs.latest:
if without_distro:
tags.append("latest")
tags.append(distro)
# The current version gets the `lts` tag when it's the first met version.is_lts
with_lts = tag_attrs.lts in (None, version) and version.is_lts
if with_lts:
tag_attrs.lts = version
if without_distro:
tags.append("lts")
tags.append(f"lts-{distro}")
# If the tag `22`, `23`, `24` etc. should be included in the tags
with_major = tag_attrs.majors.get(version.major) in (None, version)
if with_major:
tag_attrs.majors[version.major] = version
if without_distro:
tags.append(f"{version.major}")
tags.append(f"{version.major}-{distro}")
# Add all normal tags
for tag in (
f"{version.major}.{version.minor}",
f"{version.major}.{version.minor}.{version.patch}",
f"{version.major}.{version.minor}.{version.patch}.{version.tweak}",
):
if without_distro:
tags.append(tag)
tags.append(f"{tag}-{distro}")
return f"Tags: {', '.join(tags)}"
def generate_ldf(args: argparse.Namespace) -> None:
"""Collect all args.dockerfile_glob files from args.directory and generate the
Library Definition File, read about it in
https://github.com/docker-library/official-images/?tab=readme-ov-file#library-definition-files
"""
directory = Path(git_runner.cwd) / args.directory / args.image_type
versions = sorted([get_version_from_string(d.name) for d in directory.iterdir()])
assert versions, "There are no directories to generate the LDF"
if args.check_changed:
assert not path_is_changed(
directory
), f"There are uncommitted changes in {directory}"
git = Git(True)
# Support a few repositories, get the git-root for images directory
dir_git_root = (
args.directory / git_runner(f"git -C {args.directory} rev-parse --show-cdup")
).absolute()
lines = ldf_header(git, directory)
tag_attrs = TagAttrs(versions[-1], {}, None)
# We iterate from the most recent to the oldest version
for version in reversed(versions):
tag_dir = directory / str(version)
for file in tag_dir.glob(args.dockerfile_glob):
lines.append("")
distro = file.suffix[1:]
if distro == "ubuntu":
# replace 'ubuntu' by the release name from UBUNTU_NAMES
with open(file, "r", encoding="utf-8") as fd:
for l in fd:
if l.startswith("FROM ubuntu:"):
ubuntu_version = l.split(":")[-1].strip()
distro = UBUNTU_NAMES[ubuntu_version]
break
lines.append(ldf_tags(version, distro, tag_attrs))
lines.append("Architectures: amd64, arm64v8")
lines.append(f"Directory: {tag_dir.relative_to(dir_git_root)}")
lines.append(f"File: {file.name}")
# For the last '\n' in join
lines.append("")
ldf_file = (
Path(git_runner.cwd) / args.directory / DOCKER_LIBRARY_NAME[args.image_type]
)
ldf_file.write_text("\n".join(lines))
logging.info("The content of LDF file:\n%s", "\n".join(lines))
if args.commit and path_is_changed(ldf_file):
logging.info("Starting committing or %s file", ldf_file)
ldf_dir = ldf_file.parent
git_runner(f"git -C {ldf_dir} add {ldf_file}")
commit_message = (
f"Re-/Generated docker LDF for {args.image_type} image\n\n"
f"The file is generated and committed as following:\n{get_cmdline()}"
)
git_runner(f"{GIT_PREFIX} -C {ldf_dir} commit -F -", input=commit_message)
def main() -> None:
args = parse_args()
log_levels = [logging.CRITICAL, logging.WARN, logging.INFO, logging.DEBUG]
logging.basicConfig(level=log_levels[min(args.verbose, 3)])
logging.debug("Arguments are %s", pformat(args.__dict__))
if args.command == "generate-tree":
generate_tree(args)
elif args.command == "generate-ldf":
generate_ldf(args)
if __name__ == "__main__":
main()

View File

@ -132,6 +132,12 @@ class PRInfo:
ref = github_event.get("ref", "refs/heads/master")
if ref and ref.startswith("refs/heads/"):
ref = ref[11:]
# Default values
self.base_ref = "" # type: str
self.base_name = "" # type: str
self.head_ref = "" # type: str
self.head_name = "" # type: str
self.number = 0 # type: int
# workflow completed event, used for PRs only
if "action" in github_event and github_event["action"] == "completed":
@ -146,7 +152,7 @@ class PRInfo:
if "pull_request" in github_event: # pull request and other similar events
self.event_type = EventType.PULL_REQUEST
self.number = github_event["pull_request"]["number"] # type: int
self.number = github_event["pull_request"]["number"]
if pr_event_from_api:
try:
response = get_gh_api(
@ -172,17 +178,13 @@ class PRInfo:
self.pr_html_url = f"{repo_prefix}/pull/{self.number}"
# master or backport/xx.x/xxxxx - where the PR will be merged
self.base_ref = github_event["pull_request"]["base"]["ref"] # type: str
self.base_ref = github_event["pull_request"]["base"]["ref"]
# ClickHouse/ClickHouse
self.base_name = github_event["pull_request"]["base"]["repo"][
"full_name"
] # type: str
self.base_name = github_event["pull_request"]["base"]["repo"]["full_name"]
# any_branch-name - the name of working branch name
self.head_ref = github_event["pull_request"]["head"]["ref"] # type: str
self.head_ref = github_event["pull_request"]["head"]["ref"]
# UserName/ClickHouse or ClickHouse/ClickHouse
self.head_name = github_event["pull_request"]["head"]["repo"][
"full_name"
] # type: str
self.head_name = github_event["pull_request"]["head"]["repo"]["full_name"]
self.body = github_event["pull_request"]["body"]
self.labels = {
label["name"] for label in github_event["pull_request"]["labels"]

View File

@ -64,6 +64,7 @@ def upload_results(
s3_client: S3Helper,
pr_number: int,
commit_sha: str,
branch_name: str,
test_results: TestResults,
additional_files: Union[Sequence[Path], Sequence[str]],
check_name: str,
@ -80,8 +81,7 @@ def upload_results(
process_logs(s3_client, additional_files, s3_path_prefix, test_results)
)
branch_url = f"{GITHUB_SERVER_URL}/{GITHUB_REPOSITORY}/commits/master"
branch_name = "master"
branch_url = f"{GITHUB_SERVER_URL}/{GITHUB_REPOSITORY}/commits/{branch_name}"
if pr_number != 0:
branch_name = f"PR #{pr_number}"
branch_url = f"{GITHUB_SERVER_URL}/{GITHUB_REPOSITORY}/pull/{pr_number}"

View File

@ -164,6 +164,11 @@ class ClickHouseVersion:
"""our X.3 and X.8 are LTS"""
return self.minor % 5 == 3
@property
def is_supported(self) -> bool:
"we can support only versions with VersionType STABLE or LTS"
return self.description in (VersionType.STABLE, VersionType.LTS)
def get_stable_release_type(self) -> str:
if self.is_lts:
return VersionType.LTS
@ -365,7 +370,7 @@ def get_supported_versions(
versions = list(versions)
else:
# checks that repo is not shallow in background
versions = get_tagged_versions()
versions = [v for v in get_tagged_versions() if v.is_supported]
versions.sort()
versions.reverse()
for version in versions:

View File

@ -5,7 +5,8 @@ source /repo/tests/docker_scripts/utils.lib
function attach_gdb_to_clickhouse()
{
IS_ASAN=$(clickhouse-client --query "SELECT count() FROM system.build_options WHERE name = 'CXX_FLAGS' AND position('sanitize=address' IN value)")
# Use retries to avoid failures due to fault injections
IS_ASAN=$(run_with_retry 5 clickhouse-client --query "SELECT count() FROM system.build_options WHERE name = 'CXX_FLAGS' AND position('sanitize=address' IN value)")
if [[ "$IS_ASAN" = "1" ]];
then
echo "ASAN build detected. Not using gdb since it disables LeakSanitizer detections"

View File

@ -744,11 +744,13 @@ class ClickHouseCluster:
# available when with_prometheus == True
self.with_prometheus = False
self.prometheus_writer_host = "prometheus_writer"
self.prometheus_writer_ip = None
self.prometheus_writer_port = 9090
self.prometheus_writer_logs_dir = p.abspath(
p.join(self.instances_dir, "prometheus_writer/logs")
)
self.prometheus_reader_host = "prometheus_reader"
self.prometheus_reader_ip = None
self.prometheus_reader_port = 9091
self.prometheus_reader_logs_dir = p.abspath(
p.join(self.instances_dir, "prometheus_reader/logs")
@ -2728,6 +2730,16 @@ class ClickHouseCluster:
raise Exception("Can't wait LDAP to start")
def wait_prometheus_to_start(self):
self.prometheus_reader_ip = self.get_instance_ip(self.prometheus_reader_host)
self.prometheus_writer_ip = self.get_instance_ip(self.prometheus_writer_host)
self.wait_for_url(
f"http://{self.prometheus_reader_ip}:{self.prometheus_reader_port}/api/v1/query?query=time()"
)
self.wait_for_url(
f"http://{self.prometheus_writer_ip}:{self.prometheus_writer_port}/api/v1/query?query=time()"
)
def start(self):
pytest_xdist_logging_to_separate_files.setup()
logging.info("Running tests in {}".format(self.base_path))
@ -3083,12 +3095,23 @@ class ClickHouseCluster:
f"http://{self.jdbc_bridge_ip}:{self.jdbc_bridge_port}/ping"
)
if self.with_prometheus:
if self.with_prometheus and self.base_prometheus_cmd:
os.makedirs(self.prometheus_writer_logs_dir)
os.chmod(self.prometheus_writer_logs_dir, stat.S_IRWXU | stat.S_IRWXO)
os.makedirs(self.prometheus_reader_logs_dir)
os.chmod(self.prometheus_reader_logs_dir, stat.S_IRWXU | stat.S_IRWXO)
prometheus_start_cmd = self.base_prometheus_cmd + common_opts
logging.info(
"Trying to create Prometheus instances by command %s",
" ".join(map(str, prometheus_start_cmd)),
)
run_and_check(prometheus_start_cmd)
self.up_called = True
logging.info("Trying to connect to Prometheus...")
self.wait_prometheus_to_start()
clickhouse_start_cmd = self.base_cmd + ["up", "-d", "--no-recreate"]
logging.debug(
(

View File

@ -0,0 +1,88 @@
import pytest
from helpers.cluster import ClickHouseCluster
cluster = ClickHouseCluster(__file__)
ch1 = cluster.add_instance(
"ch1",
with_zookeeper=True,
macros={"replica": "node1"},
stay_alive=True,
)
database_name = "dedup_attach"
@pytest.fixture(scope="module")
def started_cluster():
try:
cluster.start()
yield cluster
finally:
cluster.shutdown()
def q(query):
return ch1.query(database=database_name, sql=query)
def test_deduplicated_attached_part_renamed_after_attach(started_cluster):
ch1.query(f"CREATE DATABASE {database_name}")
q(
"CREATE TABLE dedup (id UInt32) ENGINE=ReplicatedMergeTree('/clickhouse/tables/dedup_attach/dedup/s1', 'r1') ORDER BY id;"
)
q("INSERT INTO dedup VALUES (1),(2),(3);")
table_data_path = q(
"SELECT data_paths FROM system.tables WHERE database=currentDatabase() AND table='dedup'"
).strip("'[]\n")
ch1.exec_in_container(
[
"bash",
"-c",
f"cp -r {table_data_path}/all_0_0_0 {table_data_path}/detached/all_0_0_0",
]
)
# Part is attached as all_1_1_0
q("ALTER TABLE dedup ATTACH PART 'all_0_0_0'")
assert 2 == int(
q(
f"SELECT count() FROM system.parts WHERE database='{database_name}' AND table = 'dedup'"
).strip()
)
ch1.exec_in_container(
[
"bash",
"-c",
f"cp -r {table_data_path}/all_1_1_0 {table_data_path}/detached/all_1_1_0",
]
)
# Part is deduplicated and not attached
q("ALTER TABLE dedup ATTACH PART 'all_1_1_0'")
assert 2 == int(
q(
f"SELECT count() FROM system.parts WHERE database='{database_name}' AND table = 'dedup'"
).strip()
)
assert 1 == int(
q(
f"SELECT count() FROM system.detached_parts WHERE database='{database_name}' AND table = 'dedup'"
).strip()
)
# Check that it is not 'attaching_all_1_1_0'
assert (
"all_1_1_0"
== q(
f"SELECT name FROM system.detached_parts WHERE database='{database_name}' AND table = 'dedup'"
).strip()
)
q("DROP TABLE dedup")
q("SYSTEM DROP REPLICA 'r1' FROM ZKPATH '/clickhouse/tables/dedup_attach/dedup/s1'")
ch1.query(f"DROP DATABASE {database_name}")

View File

@ -20,7 +20,7 @@ node = cluster.add_instance(
def execute_query_on_prometheus_writer(query, timestamp):
return execute_query_impl(
cluster.get_instance_ip(cluster.prometheus_writer_host),
cluster.prometheus_writer_ip,
cluster.prometheus_writer_port,
"/api/v1/query",
query,
@ -30,7 +30,7 @@ def execute_query_on_prometheus_writer(query, timestamp):
def execute_query_on_prometheus_reader(query, timestamp):
return execute_query_impl(
cluster.get_instance_ip(cluster.prometheus_reader_host),
cluster.prometheus_reader_ip,
cluster.prometheus_reader_port,
"/api/v1/query",
query,

View File

@ -5,7 +5,7 @@ SET max_block_size = 1000;
SET max_memory_usage = 1000000000;
INSERT INTO size_hint SELECT arrayMap(x -> 'Hello', range(1000)) FROM numbers(10000);
SET max_memory_usage = 100000000, max_threads = 2;
SET max_memory_usage = 105000000, max_threads = 2;
SELECT count(), sum(length(s)) FROM size_hint;
DROP TABLE size_hint;

View File

@ -3,3 +3,10 @@
3 2023-03-16 12:22:33 2023-03-16 10:22:33.000 2023-03-16 03:22:33 2023-03-16 19:22:33.123
2024-02-24 10:22:33 2024-02-24 12:22:33
2024-10-24 09:22:33 2024-10-24 13:22:33
2024-10-24 16:22:33 2024-10-24 06:22:33
leap year: 2024-02-29 16:22:33 2024-02-29 06:22:33
non-leap year: 2023-03-01 16:22:33 2023-03-01 06:22:33
leap year: 2024-02-29 04:22:33 2024-02-29 19:22:33
non-leap year: 2023-03-01 04:22:33 2023-02-28 19:22:33
timezone with half-hour offset: 2024-02-29 00:52:33 2024-02-29 21:52:33
jump over a year: 2024-01-01 04:01:01 2023-12-31 20:01:01

View File

@ -15,4 +15,13 @@ $CLICKHOUSE_CLIENT -q "select x, to_utc_timestamp(toDateTime('2023-03-16 11:22:3
# timestamp convert between DST timezone and UTC
$CLICKHOUSE_CLIENT -q "select to_utc_timestamp(toDateTime('2024-02-24 11:22:33'), 'Europe/Madrid'), from_utc_timestamp(toDateTime('2024-02-24 11:22:33'), 'Europe/Madrid')"
$CLICKHOUSE_CLIENT -q "select to_utc_timestamp(toDateTime('2024-10-24 11:22:33'), 'Europe/Madrid'), from_utc_timestamp(toDateTime('2024-10-24 11:22:33'), 'Europe/Madrid')"
$CLICKHOUSE_CLIENT -q "select to_utc_timestamp(toDateTime('2024-10-24 11:22:33'), 'EST'), from_utc_timestamp(toDateTime('2024-10-24 11:22:33'), 'EST')"
$CLICKHOUSE_CLIENT -q "select 'leap year:', to_utc_timestamp(toDateTime('2024-02-29 11:22:33'), 'EST'), from_utc_timestamp(toDateTime('2024-02-29 11:22:33'), 'EST')"
$CLICKHOUSE_CLIENT -q "select 'non-leap year:', to_utc_timestamp(toDateTime('2023-02-29 11:22:33'), 'EST'), from_utc_timestamp(toDateTime('2023-02-29 11:22:33'), 'EST')"
$CLICKHOUSE_CLIENT -q "select 'leap year:', to_utc_timestamp(toDateTime('2024-02-28 23:22:33'), 'EST'), from_utc_timestamp(toDateTime('2024-03-01 00:22:33'), 'EST')"
$CLICKHOUSE_CLIENT -q "select 'non-leap year:', to_utc_timestamp(toDateTime('2023-02-28 23:22:33'), 'EST'), from_utc_timestamp(toDateTime('2023-03-01 00:22:33'), 'EST')"
$CLICKHOUSE_CLIENT -q "select 'timezone with half-hour offset:', to_utc_timestamp(toDateTime('2024-02-29 11:22:33'), 'Australia/Adelaide'), from_utc_timestamp(toDateTime('2024-02-29 11:22:33'), 'Australia/Adelaide')"
$CLICKHOUSE_CLIENT -q "select 'jump over a year:', to_utc_timestamp(toDateTime('2023-12-31 23:01:01'), 'EST'), from_utc_timestamp(toDateTime('2024-01-01 01:01:01'), 'EST')"
$CLICKHOUSE_CLIENT -q "drop table test_tbl"

View File

@ -83,7 +83,7 @@ QUERY id: 0
FUNCTION id: 4, function_name: tuple, function_type: ordinary, result_type: Tuple(Nullable(UInt64))
ARGUMENTS
LIST id: 5, nodes: 1
FUNCTION id: 6, function_name: sum, function_type: aggregate, nulls_action : IGNORE_NULLS, result_type: Nullable(UInt64)
FUNCTION id: 6, function_name: sum, function_type: aggregate, result_type: Nullable(UInt64)
ARGUMENTS
LIST id: 7, nodes: 1
FUNCTION id: 8, function_name: if, function_type: ordinary, result_type: Nullable(UInt8)

View File

@ -23,8 +23,8 @@
--Interval 123: check that the SleepFunctionCalls, SleepFunctionMilliseconds and ProfileEvent_SleepFunctionElapsedMicroseconds are correct
1
--Check that a query_metric_log_interval=0 disables the collection
0
1
-Check that a query which execution time is less than query_metric_log_interval is never collected
0
1
--Check that there is a final event when queries finish
3
1

View File

@ -84,17 +84,17 @@ check_log 123
# query_metric_log_interval=0 disables the collection altogether
$CLICKHOUSE_CLIENT -m -q """
SELECT '--Check that a query_metric_log_interval=0 disables the collection';
SELECT count() FROM system.query_metric_log WHERE event_date >= yesterday() AND query_id = '${query_prefix}_0'
SELECT count() == 0 FROM system.query_metric_log WHERE event_date >= yesterday() AND query_id = '${query_prefix}_0'
"""
# a quick query that takes less than query_metric_log_interval is never collected
$CLICKHOUSE_CLIENT -m -q """
SELECT '-Check that a query which execution time is less than query_metric_log_interval is never collected';
SELECT count() FROM system.query_metric_log WHERE event_date >= yesterday() AND query_id = '${query_prefix}_fast'
SELECT count() == 0 FROM system.query_metric_log WHERE event_date >= yesterday() AND query_id = '${query_prefix}_fast'
"""
# a query that takes more than query_metric_log_interval is collected including the final row
$CLICKHOUSE_CLIENT -m -q """
SELECT '--Check that there is a final event when queries finish';
SELECT count() FROM system.query_metric_log WHERE event_date >= yesterday() AND query_id = '${query_prefix}_1000'
SELECT count() > 2 FROM system.query_metric_log WHERE event_date >= yesterday() AND query_id = '${query_prefix}_1000'
"""

View File

@ -0,0 +1,51 @@
#!/usr/bin/env -S ${HOME}/clickhouse-client --queries-file
DROP TABLE IF EXISTS with_fill_date__fuzz_0;
CREATE TABLE with_fill_date__fuzz_0
(
`d` Date,
`d32` Nullable(Int32),
`d33` Int32
)
ENGINE = Memory;
INSERT INTO with_fill_date__fuzz_0 VALUES (toDate('2020-03-03'), 1, 3), (toDate('2020-03-03'), NULL, 3), (toDate('2020-02-05'), 1, 1);
SELECT count()
FROM with_fill_date__fuzz_0
ORDER BY
count(),
count() IGNORE NULLS,
max(d)
WITH FILL STEP toIntervalDay(10)
;
SELECT count()
FROM with_fill_date__fuzz_0
ORDER BY
any(d32) RESPECT NULLS,
any_respect_nulls(d32),
max(d)
WITH FILL STEP toIntervalDay(10)
;
SELECT count()
FROM with_fill_date__fuzz_0
ORDER BY
any(d32),
any(d32) IGNORE NULLS,
any(d32) RESPECT NULLS,
any_respect_nulls(d32) IGNORE NULLS,
any_respect_nulls(d32),
sum(d33),
sum(d33) IGNORE NULLS,
max(d)
WITH FILL STEP toIntervalDay(10)
;

View File

@ -41,3 +41,13 @@
[1,2,3,4,5,10,20]
-------
[1,2,3]
-------
[10,-2,1] ['hello','hi'] [3,2,1,NULL]
-------
-------
[1]
-------
[1,2,3,4,5,6,7,8,9,10,11,12,13,14,15,16,17,18,19,20,21,22,23,24,25,26,27,28,29,30,31,32,33,34,35,36,37,38,39,40,41,42,43,44,45,46,47,48,49,50,51,52,53,54,55,56,57,58,59,60,61,62,63,64,65,66,67,68,69,70,71,72,73,74,75,76,77,78,79,80,81,82,83,84,85,86,87,88,89,90,91,92,93,94,95,96,97,98,99,100,101,102,103,104,105,106,107,108,109,110,111,112,113,114,115,116,117,118,119,120,121,122,123,124,125,126,127,128,129,130,131,132,133,134,135,136,137,138,139,140,141,142,143,144,145,146,147,148,149,150,151,152,153,154,155,156,157,158,159,160,161,162,163,164,165,166,167,168,169,170,171,172,173,174,175,176,177,178,179,180,181,182,183,184,185,186,187,188,189,190,191,192,193,194,195,196,197,198,199,200,201,202,203,204,205,206,207,208,209,210,211,212,213,214,215,216,217,218,219,220,221,222,223,224,225,226,227,228,229,230,231,232,233,234,235,236,237,238,239,240,241,242,243,244,245,246,247,248,249,250,251,252,253,254,255,256]
199999
-------
[1,2,3,4,5,6,7,8,9,10,11,12,13,14,15,16,17,18,19]

View File

@ -35,4 +35,23 @@ SELECT arraySort(arrayUnion([NULL, NULL, NULL, 1], [1, NULL, NULL], [1, 2, 3, NU
select '-------';
SELECT arraySort(arrayUnion([1, 1, 1, 2, 3], [2, 2, 4], [5, 10, 20]));
select '-------';
SELECT arraySort(arrayUnion([1, 2], [1, 3], [])),
SELECT arraySort(arrayUnion([1, 2], [1, 3], []));
select '-------';
-- example from docs
SELECT
arrayUnion([-2, 1], [10, 1], [-2], []) as num_example,
arrayUnion(['hi'], [], ['hello', 'hi']) as str_example,
arrayUnion([1, 3, NULL], [2, 3, NULL]) as null_example;
select '-------';
--mix of types
SELECT arrayUnion([1], [-2], [1.1, 'hi'], [NULL, 'hello', []]); -- {serverError NO_COMMON_TYPE}
select '-------';
SELECT arrayUnion([1]);
SELECT arrayUnion(); -- {serverError NUMBER_OF_ARGUMENTS_DOESNT_MATCH}
select '-------';
--bigger arrays
SELECT arraySort(arrayUnion(range(1, 256), range(2, 257)));
SELECT length(arrayUnion(range(1, 100000), range(9999, 200000)));
select '-------';
--bigger number of arguments
SELECT arraySort(arrayUnion([1, 2], [1, 3], [1, 4], [1, 5], [1, 6], [1, 7], [1, 8], [1, 9], [1, 10], [1, 11], [1, 12], [1, 13], [1, 14], [1, 15], [1, 16], [1, 17], [1, 18], [1, 19]));

View File

@ -1,6 +1,6 @@
quantileExactWeightedInterpolated
0 0 0 Decimal(38, 8)
-25.5 -8.49999999 -5.1 Decimal(38, 8)
0 0 0 25 2024-02-20 Decimal(38, 8)
-25.5 -8.49999999 -5.1 12.25 2024-01-25 Decimal(38, 8)
0 0 0
10 3.33333333 2
20 6.66666666 4
@ -10,11 +10,14 @@ quantileExactWeightedInterpolated
[-50,-40,-30,-20,-10,0,10,20,30,40,50]
[-16.66666666,-13.33333333,-10,-6.66666666,-3.33333333,0,3.33333333,6.66666666,10,13.33333333,16.66666666]
[-10,-8,-6,-4,-2,0,2,4,6,8,10]
[0,5,10,15,20,25,30,35,40,45,50]
['2024-01-01','2024-01-11','2024-01-21','2024-01-31','2024-02-10','2024-02-20','2024-03-01','2024-03-11','2024-03-21','2024-03-31','2024-04-10']
quantileExactWeightedInterpolatedState
[10000.6,20000.2,29999.8,39999.4]
Test with filter that returns no rows
0 0 0
0 0 0 nan 1970-01-01
0 0 0 nan 1970-01-01
Test with dynamic weights
21 7 4.2
21 7 4.2 35.5 2024-03-12
Test with all weights set to 0
0 0 0
0 0 0 nan 1970-01-01

View File

@ -5,16 +5,28 @@ CREATE TABLE decimal
a Decimal32(4),
b Decimal64(8),
c Decimal128(8),
f Float64,
d Date,
w UInt64
) ENGINE = Memory;
INSERT INTO decimal (a, b, c, w)
SELECT toDecimal32(number - 50, 4), toDecimal64(number - 50, 8) / 3, toDecimal128(number - 50, 8) / 5, number
INSERT INTO decimal (a, b, c, f, d, w)
SELECT toDecimal32(number - 50, 4), toDecimal64(number - 50, 8) / 3, toDecimal128(number - 50, 8) / 5, number/2, addDays(toDate('2024-01-01'), number), number
FROM system.numbers LIMIT 101;
SELECT 'quantileExactWeightedInterpolated';
SELECT medianExactWeightedInterpolated(a, 1), medianExactWeightedInterpolated(b, 2), medianExactWeightedInterpolated(c, 3) as x, toTypeName(x) FROM decimal;
SELECT quantileExactWeightedInterpolated(a, 1), quantileExactWeightedInterpolated(b, 2), quantileExactWeightedInterpolated(c, 3) as x, toTypeName(x) FROM decimal WHERE a < 0;
SELECT medianExactWeightedInterpolated(a, 1),
medianExactWeightedInterpolated(b, 2),
medianExactWeightedInterpolated(c, 3) as x,
medianExactWeightedInterpolated(f, 4),
medianExactWeightedInterpolated(d, 5),
toTypeName(x) FROM decimal;
SELECT quantileExactWeightedInterpolated(a, 1),
quantileExactWeightedInterpolated(b, 2),
quantileExactWeightedInterpolated(c, 3) as x,
quantileExactWeightedInterpolated(f, 4),
quantileExactWeightedInterpolated(d, 5),
toTypeName(x) FROM decimal WHERE a < 0;
SELECT quantileExactWeightedInterpolated(0.0)(a, 1), quantileExactWeightedInterpolated(0.0)(b, 2), quantileExactWeightedInterpolated(0.0)(c, 3) FROM decimal WHERE a >= 0;
SELECT quantileExactWeightedInterpolated(0.2)(a, 1), quantileExactWeightedInterpolated(0.2)(b, 2), quantileExactWeightedInterpolated(0.2)(c, 3) FROM decimal WHERE a >= 0;
SELECT quantileExactWeightedInterpolated(0.4)(a, 1), quantileExactWeightedInterpolated(0.4)(b, 2), quantileExactWeightedInterpolated(0.4)(c, 3) FROM decimal WHERE a >= 0;
@ -24,6 +36,8 @@ SELECT quantileExactWeightedInterpolated(1.0)(a, 1), quantileExactWeightedInterp
SELECT quantilesExactWeightedInterpolated(0.0, 0.1, 0.2, 0.3, 0.4, 0.5, 0.6, 0.7, 0.8, 0.9, 1.0)(a, 1) FROM decimal;
SELECT quantilesExactWeightedInterpolated(0.0, 0.1, 0.2, 0.3, 0.4, 0.5, 0.6, 0.7, 0.8, 0.9, 1.0)(b, 2) FROM decimal;
SELECT quantilesExactWeightedInterpolated(0.0, 0.1, 0.2, 0.3, 0.4, 0.5, 0.6, 0.7, 0.8, 0.9, 1.0)(c, 3) FROM decimal;
SELECT quantilesExactWeightedInterpolated(0.0, 0.1, 0.2, 0.3, 0.4, 0.5, 0.6, 0.7, 0.8, 0.9, 1.0)(f, 4) FROM decimal;
SELECT quantilesExactWeightedInterpolated(0.0, 0.1, 0.2, 0.3, 0.4, 0.5, 0.6, 0.7, 0.8, 0.9, 1.0)(d, 5) FROM decimal;
SELECT 'quantileExactWeightedInterpolatedState';
SELECT quantilesExactWeightedInterpolatedMerge(0.2, 0.4, 0.6, 0.8)(x)
@ -34,12 +48,29 @@ FROM
);
SELECT 'Test with filter that returns no rows';
SELECT medianExactWeightedInterpolated(a, 1), medianExactWeightedInterpolated(b, 2), medianExactWeightedInterpolated(c, 3) FROM decimal WHERE a > 1000;
SELECT medianExactWeightedInterpolated(a, 1),
medianExactWeightedInterpolated(b, 2),
medianExactWeightedInterpolated(c, 3),
medianExactWeightedInterpolated(f, 4),
medianExactWeightedInterpolated(d, 5) FROM decimal WHERE a > 1000;
SELECT quantileExactWeightedInterpolated(a, 1),
quantileExactWeightedInterpolated(b, 2),
quantileExactWeightedInterpolated(c, 3),
quantileExactWeightedInterpolated(f, 4),
quantileExactWeightedInterpolated(d, 5) FROM decimal WHERE d < toDate('2024-01-01');
SELECT 'Test with dynamic weights';
SELECT medianExactWeightedInterpolated(a, w), medianExactWeightedInterpolated(b, w), medianExactWeightedInterpolated(c, w) FROM decimal;
SELECT medianExactWeightedInterpolated(a, w),
medianExactWeightedInterpolated(b, w),
medianExactWeightedInterpolated(c, w),
medianExactWeightedInterpolated(f, w),
medianExactWeightedInterpolated(d, w) FROM decimal;
SELECT 'Test with all weights set to 0';
SELECT medianExactWeightedInterpolated(a, 0), medianExactWeightedInterpolated(b, 0), medianExactWeightedInterpolated(c, 0) FROM decimal;
SELECT medianExactWeightedInterpolated(a, 0),
medianExactWeightedInterpolated(b, 0),
medianExactWeightedInterpolated(c, 0),
medianExactWeightedInterpolated(f, 0),
medianExactWeightedInterpolated(d, 0) FROM decimal;
DROP TABLE IF EXISTS decimal;

View File

@ -0,0 +1,12 @@
number Nullable(Int64)
date_field Nullable(Date32)
\N
1970-01-02
\N
1970-01-04
\N
1970-01-06
\N
1970-01-08
\N
1970-01-10

View File

@ -0,0 +1,14 @@
-- Tags: no-fasttest, no-parallel
SET session_timezone = 'UTC';
SET engine_file_truncate_on_insert = 1;
insert into function file('03259.orc', 'ORC')
select
number,
if (number % 2 = 0, null, toDate32(number)) as date_field
from numbers(10);
desc file('03259.orc', 'ORC');
select date_field from file('03259.orc', 'ORC') order by number;

View File

@ -0,0 +1,7 @@
SET allow_suspicious_low_cardinality_types = 1, allow_experimental_dynamic_type = 1;
DROP TABLE IF EXISTS t0;
CREATE TABLE t0 (c0 LowCardinality(Nullable(Int))) ENGINE = Memory();
INSERT INTO TABLE t0 (c0) VALUES (NULL);
SELECT c0::Dynamic FROM t0;
SELECT c0 FROM t0;
DROP TABLE t0;

View File

@ -0,0 +1,10 @@
-- https://github.com/ClickHouse/ClickHouse/issues/71382
DROP TABLE IF EXISTS rewrite;
CREATE TABLE rewrite (c0 Int) ENGINE = Memory();
SELECT 1
FROM rewrite
INNER JOIN rewrite AS y ON (
SELECT 1
)
INNER JOIN rewrite AS z ON 1
SETTINGS optimize_rewrite_array_exists_to_has=1;

View File

@ -0,0 +1,2 @@
1
1

View File

@ -0,0 +1,96 @@
#!/usr/bin/env bash
set -e
CUR_DIR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)
# shellcheck source=../shell_config.sh
. "$CUR_DIR"/../shell_config.sh
$CLICKHOUSE_CLIENT -q "
CREATE TABLE t
(
a UInt32,
b UInt32
)
ENGINE = MergeTree
ORDER BY (a, b);
INSERT INTO t SELECT number, number FROM numbers(1000);
"
query_id="03270_processors_profile_log_3_$RANDOM"
$CLICKHOUSE_CLIENT --query_id="$query_id" -q "
SET log_processors_profiles = 1;
WITH
t0 AS
(
SELECT *
FROM numbers(1000)
),
t1 AS
(
SELECT number * 3 AS b
FROM t0
)
SELECT b * 3
FROM t
WHERE a IN (t1)
FORMAT Null;
"
$CLICKHOUSE_CLIENT --query_id="$query_id" -q "
SYSTEM FLUSH LOGS;
SELECT sum(elapsed_us) > 0
FROM system.processors_profile_log
WHERE event_date >= yesterday() AND query_id = '$query_id' AND name = 'CreatingSetsTransform';
"
#####################################################################
$CLICKHOUSE_CLIENT -q "
CREATE TABLE t1
(
st FixedString(54)
)
ENGINE = MergeTree
ORDER BY tuple();
INSERT INTO t1 VALUES
('abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRTUVWXYZ'),
('\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0'),
('IIIIIIIIII\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0');
"
query_id="03270_processors_profile_log_3_$RANDOM"
$CLICKHOUSE_CLIENT --query_id="$query_id" -q "
SET log_processors_profiles = 1;
SET max_threads=2; -- no merging when max_threads=1
WITH
(
SELECT groupConcat(',')(st)
FROM t1
ORDER BY ALL
) AS a,
(
SELECT groupConcat(',')(CAST(st, 'String'))
FROM t1
ORDER BY ALL
) AS b
SELECT a = b
FORMAT Null;
"
$CLICKHOUSE_CLIENT --query_id="$query_id" -q "
SYSTEM FLUSH LOGS;
SELECT sum(elapsed_us) > 0
FROM system.processors_profile_log
WHERE event_date >= yesterday() AND query_id = '$query_id' AND name = 'MergingSortedTransform';
"