Merge branch 'ClickHouse:master' into patch-1

This commit is contained in:
danila-ermakov 2024-03-19 08:38:24 -04:00 committed by GitHub
commit cf9433fdd8
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
58 changed files with 452 additions and 176 deletions

View File

@ -61,6 +61,18 @@ if [[ -n "$BUGFIX_VALIDATE_CHECK" ]] && [[ "$BUGFIX_VALIDATE_CHECK" -eq 1 ]]; th
rm /etc/clickhouse-server/users.d/s3_cache_new.xml
rm /etc/clickhouse-server/config.d/zero_copy_destructive_operations.xml
#todo: remove these after 24.3 released.
sudo cat /etc/clickhouse-server/config.d/azure_storage_conf.xml \
| sed "s|<object_storage_type>azure|<object_storage_type>azure_blob_storage|" \
> /etc/clickhouse-server/config.d/azure_storage_conf.xml.tmp
sudo mv /etc/clickhouse-server/config.d/azure_storage_conf.xml.tmp /etc/clickhouse-server/config.d/azure_storage_conf.xml
#todo: remove these after 24.3 released.
sudo cat /etc/clickhouse-server/config.d/storage_conf.xml \
| sed "s|<object_storage_type>local|<object_storage_type>local_blob_storage|" \
> /etc/clickhouse-server/config.d/storage_conf.xml.tmp
sudo mv /etc/clickhouse-server/config.d/storage_conf.xml.tmp /etc/clickhouse-server/config.d/storage_conf.xml
function remove_keeper_config()
{
sudo cat /etc/clickhouse-server/config.d/keeper_port.xml \

View File

@ -1,6 +1,7 @@
#include <Backups/BackupCoordinationRemote.h>
#include <base/hex.h>
#include <boost/algorithm/string/split.hpp>
#include <Access/Common/AccessEntityType.h>
#include <Backups/BackupCoordinationReplicatedAccess.h>

View File

@ -17,6 +17,7 @@
#include <Common/NamedCollections/NamedCollections.h>
#include <Common/NamedCollections/NamedCollectionConfiguration.h>
#include <filesystem>
namespace fs = std::filesystem;

View File

@ -23,6 +23,9 @@
#include <Interpreters/Context.h>
#include <Common/Macros.h>
#include <filesystem>
namespace fs = std::filesystem;
namespace DB
{

View File

@ -190,14 +190,26 @@ public:
{
if (col_haystack_const && col_needle_const)
{
const auto is_col_start_pos_const = !column_start_pos || isColumnConst(*column_start_pos);
auto column_start_position_arg = column_start_pos;
bool is_col_start_pos_const = false;
if (column_start_pos)
{
if (const ColumnConst * const_column_start_pos = typeid_cast<const ColumnConst *>(&*column_start_pos))
{
is_col_start_pos_const = true;
column_start_position_arg = const_column_start_pos->getDataColumnPtr();
}
}
else
is_col_start_pos_const = true;
vec_res.resize(is_col_start_pos_const ? 1 : column_start_pos->size());
const auto null_map = create_null_map();
Impl::constantConstant(
col_haystack_const->getValue<String>(),
col_needle_const->getValue<String>(),
column_start_pos,
column_start_position_arg,
vec_res,
null_map.get());

View File

@ -6,6 +6,7 @@
#include <Columns/ColumnVector.h>
#include <Columns/ColumnsNumber.h>
#include <Columns/ColumnConst.h>
#include <Columns/ColumnsNumber.h>
#include <Functions/FunctionFactory.h>
#include <Functions/IFunction.h>
#include <Interpreters/Context.h>

View File

@ -44,7 +44,7 @@ struct RepeatImpl
ColumnString::Offsets & res_offsets,
T repeat_time)
{
repeat_time = repeat_time < 0 ? 0 : repeat_time;
repeat_time = repeat_time < 0 ? static_cast<T>(0) : repeat_time;
checkRepeatTime(repeat_time);
UInt64 data_size = 0;
@ -76,7 +76,7 @@ struct RepeatImpl
res_offsets.assign(offsets);
for (UInt64 i = 0; i < col_num.size(); ++i)
{
T repeat_time = col_num[i] < 0 ? 0 : col_num[i];
T repeat_time = col_num[i] < 0 ? static_cast<T>(0) : col_num[i];
size_t repeated_size = (offsets[i] - offsets[i - 1] - 1) * repeat_time + 1;
checkStringSize(repeated_size);
data_size += repeated_size;
@ -86,7 +86,7 @@ struct RepeatImpl
for (UInt64 i = 0; i < col_num.size(); ++i)
{
T repeat_time = col_num[i] < 0 ? 0 : col_num[i];
T repeat_time = col_num[i] < 0 ? static_cast<T>(0) : col_num[i];
checkRepeatTime(repeat_time);
process(data.data() + offsets[i - 1], res_data.data() + res_offsets[i - 1], offsets[i] - offsets[i - 1], repeat_time);
}
@ -105,7 +105,7 @@ struct RepeatImpl
UInt64 col_size = col_num.size();
for (UInt64 i = 0; i < col_size; ++i)
{
T repeat_time = col_num[i] < 0 ? 0 : col_num[i];
T repeat_time = col_num[i] < 0 ? static_cast<T>(0) : col_num[i];
size_t repeated_size = str_size * repeat_time + 1;
checkStringSize(repeated_size);
data_size += repeated_size;
@ -114,7 +114,7 @@ struct RepeatImpl
res_data.resize(data_size);
for (UInt64 i = 0; i < col_size; ++i)
{
T repeat_time = col_num[i] < 0 ? 0 : col_num[i];
T repeat_time = col_num[i] < 0 ? static_cast<T>(0) : col_num[i];
checkRepeatTime(repeat_time);
process(
reinterpret_cast<UInt8 *>(const_cast<char *>(copy_str.data())),
@ -169,8 +169,19 @@ class FunctionRepeat : public IFunction
template <typename F>
static bool castType(const IDataType * type, F && f)
{
return castTypeToEither<DataTypeInt8, DataTypeInt16, DataTypeInt32, DataTypeInt64,
DataTypeUInt8, DataTypeUInt16, DataTypeUInt32, DataTypeUInt64>(type, std::forward<F>(f));
return castTypeToEither<
DataTypeInt8,
DataTypeInt16,
DataTypeInt32,
DataTypeInt64,
DataTypeInt128,
DataTypeInt256,
DataTypeUInt8,
DataTypeUInt16,
DataTypeUInt32,
DataTypeUInt64,
DataTypeUInt128,
DataTypeUInt256>(type, std::forward<F>(f));
}
public:
@ -208,7 +219,7 @@ public:
if (const ColumnConst * col_num_const = checkAndGetColumn<ColumnConst>(col_num.get()))
{
auto col_res = ColumnString::create();
castType(arguments[1].type.get(), [&](const auto & type)
auto success = castType(arguments[1].type.get(), [&](const auto & type)
{
using DataType = std::decay_t<decltype(type)>;
using T = typename DataType::FieldType;
@ -216,6 +227,11 @@ public:
RepeatImpl::vectorStrConstRepeat(col->getChars(), col->getOffsets(), col_res->getChars(), col_res->getOffsets(), times);
return true;
});
if (!success)
throw Exception(ErrorCodes::ILLEGAL_COLUMN, "Illegal column type {} of 'n' of function {}",
arguments[1].column->getName(), getName());
return col_res;
}
else if (castType(arguments[1].type.get(), [&](const auto & type)

View File

@ -783,6 +783,17 @@ bool FileCache::tryReserve(
ProfileEventTimeIncrement<Microseconds> watch(ProfileEvents::FilesystemCacheReserveMicroseconds);
assertInitialized();
/// A logical race on cache_is_being_resized is still possible,
/// in this case we will try to lock cache with timeout, this is ok, timeout is small
/// and as resizing of cache can take a long time then this small chance of a race is
/// ok compared to the number of cases this check will help.
if (cache_is_being_resized.load(std::memory_order_relaxed))
{
ProfileEvents::increment(ProfileEvents::FilesystemCacheFailToReserveSpaceBecauseOfLockContention);
return false;
}
auto cache_lock = tryLockCache(std::chrono::milliseconds(lock_wait_timeout_milliseconds));
if (!cache_lock)
{
@ -1264,12 +1275,14 @@ std::vector<String> FileCache::tryGetCachePaths(const Key & key)
size_t FileCache::getUsedCacheSize() const
{
return main_priority->getSize(lockCache());
/// We use this method for metrics, so it is ok to get approximate result.
return main_priority->getSizeApprox();
}
size_t FileCache::getFileSegmentsNum() const
{
return main_priority->getElementsCount(lockCache());
/// We use this method for metrics, so it is ok to get approximate result.
return main_priority->getElementsCountApprox();
}
void FileCache::assertCacheCorrectness()
@ -1327,8 +1340,12 @@ void FileCache::applySettingsIfPossible(const FileCacheSettings & new_settings,
if (new_settings.max_size != actual_settings.max_size
|| new_settings.max_elements != actual_settings.max_elements)
{
auto cache_lock = lockCache();
cache_is_being_resized.store(true, std::memory_order_relaxed);
SCOPE_EXIT({
cache_is_being_resized.store(false, std::memory_order_relaxed);
});
auto cache_lock = lockCache();
bool updated = false;
try
{

View File

@ -202,6 +202,7 @@ private:
mutable std::mutex init_mutex;
std::unique_ptr<StatusFile> status_file;
std::atomic<bool> shutdown = false;
std::atomic<bool> cache_is_being_resized = false;
std::mutex apply_settings_mutex;

View File

@ -63,8 +63,12 @@ public:
virtual size_t getSize(const CachePriorityGuard::Lock &) const = 0;
virtual size_t getSizeApprox() const = 0;
virtual size_t getElementsCount(const CachePriorityGuard::Lock &) const = 0;
virtual size_t getElementsCountApprox() const = 0;
/// Throws exception if there is not enough size to fit it.
virtual IteratorPtr add( /// NOLINT
KeyMetadataPtr key_metadata,

View File

@ -28,6 +28,10 @@ public:
size_t getElementsCount(const CachePriorityGuard::Lock &) const override { return state->current_elements_num; }
size_t getSizeApprox() const override { return state->current_size; }
size_t getElementsCountApprox() const override { return state->current_elements_num; }
bool canFit( /// NOLINT
size_t size,
const CachePriorityGuard::Lock &,

View File

@ -44,6 +44,16 @@ size_t SLRUFileCachePriority::getElementsCount(const CachePriorityGuard::Lock &
return protected_queue.getElementsCount(lock) + probationary_queue.getElementsCount(lock);
}
size_t SLRUFileCachePriority::getSizeApprox() const
{
return protected_queue.getSizeApprox() + probationary_queue.getSizeApprox();
}
size_t SLRUFileCachePriority::getElementsCountApprox() const
{
return protected_queue.getElementsCountApprox() + probationary_queue.getElementsCountApprox();
}
bool SLRUFileCachePriority::canFit( /// NOLINT
size_t size,
const CachePriorityGuard::Lock & lock,

View File

@ -25,6 +25,10 @@ public:
size_t getElementsCount(const CachePriorityGuard::Lock &) const override;
size_t getSizeApprox() const override;
size_t getElementsCountApprox() const override;
bool canFit( /// NOLINT
size_t size,
const CachePriorityGuard::Lock &,

View File

@ -26,7 +26,6 @@
#include <Common/noexcept_scope.h>
#include <Common/checkStackSize.h>
#include "Interpreters/Context_fwd.h"
#include "config.h"
#if USE_MYSQL

View File

@ -22,9 +22,6 @@
#include <set>
#include <unordered_map>
#include <unordered_set>
#include <filesystem>
namespace fs = std::filesystem;
namespace DB
{

View File

@ -86,6 +86,7 @@ void replaceStorageInQueryTree(QueryTreeNodePtr & query_tree, const ContextPtr &
continue;
auto replacement_table_expression = std::make_shared<TableNode>(storage, context);
replacement_table_expression->setAlias(node->getAlias());
if (auto table_expression_modifiers = table_node.getTableExpressionModifiers())
replacement_table_expression->setTableExpressionModifiers(*table_expression_modifiers);

View File

@ -52,7 +52,12 @@ BlockIO InterpreterTransactionControlQuery::executeCommit(ContextMutablePtr sess
{
auto txn = session_context->getCurrentTransaction();
if (!txn)
{
if (session_context->getClientInfo().interface == ClientInfo::Interface::MYSQL)
return {};
else
throw Exception(ErrorCodes::INVALID_TRANSACTION, "There is no current transaction");
}
if (txn->getState() != MergeTreeTransaction::RUNNING)
throw Exception(ErrorCodes::INVALID_TRANSACTION, "Transaction is not in RUNNING state");
@ -111,7 +116,12 @@ BlockIO InterpreterTransactionControlQuery::executeRollback(ContextMutablePtr se
{
auto txn = session_context->getCurrentTransaction();
if (!txn)
{
if (session_context->getClientInfo().interface == ClientInfo::Interface::MYSQL)
return {};
else
throw Exception(ErrorCodes::INVALID_TRANSACTION, "There is no current transaction");
}
if (txn->getState() == MergeTreeTransaction::COMMITTED)
throw Exception(ErrorCodes::LOGICAL_ERROR, "Transaction is in COMMITTED state");
if (txn->getState() == MergeTreeTransaction::COMMITTING)

View File

@ -402,10 +402,17 @@ MutationsInterpreter::MutationsInterpreter(
, metadata_snapshot(metadata_snapshot_)
, commands(std::move(commands_))
, available_columns(std::move(available_columns_))
, context(Context::createCopy(context_))
, settings(std::move(settings_))
, select_limits(SelectQueryOptions().analyze(!settings.can_execute).ignoreLimits())
{
auto new_context = Context::createCopy(context_);
if (new_context->getSettingsRef().allow_experimental_analyzer)
{
new_context->setSetting("allow_experimental_analyzer", false);
LOG_DEBUG(&Poco::Logger::get("MutationsInterpreter"), "Will use old analyzer to prepare mutation");
}
context = std::move(new_context);
prepare(!settings.can_execute);
}

View File

@ -29,14 +29,14 @@ const DB::DataStream & getChildOutputStream(DB::QueryPlan::Node & node)
namespace DB::QueryPlanOptimizations
{
/// This is a check that output columns does not have the same name
/// This is a check that nodes columns does not have the same name
/// This is ok for DAG, but may introduce a bug in a SotringStep cause columns are selected by name.
static bool areOutputsConvertableToBlock(const ActionsDAG::NodeRawConstPtrs & outputs)
static bool areNodesConvertableToBlock(const ActionsDAG::NodeRawConstPtrs & nodes)
{
std::unordered_set<std::string_view> names;
for (const auto & output : outputs)
for (const auto & node : nodes)
{
if (!names.emplace(output->result_name).second)
if (!names.emplace(node->result_name).second)
return false;
}
@ -72,7 +72,7 @@ size_t tryExecuteFunctionsAfterSorting(QueryPlan::Node * parent_node, QueryPlan:
if (unneeded_for_sorting->trivial())
return 0;
if (!areOutputsConvertableToBlock(needed_for_sorting->getOutputs()))
if (!areNodesConvertableToBlock(needed_for_sorting->getOutputs()) || !areNodesConvertableToBlock(unneeded_for_sorting->getInputs()))
return 0;
// Sorting (parent_node) -> Expression (child_node)

View File

@ -41,6 +41,10 @@
#include <Disks/IO/ReadBufferFromAzureBlobStorage.h>
#include <Disks/IO/WriteBufferFromAzureBlobStorage.h>
#include <filesystem>
namespace fs = std::filesystem;
using namespace Azure::Storage::Blobs;
namespace CurrentMetrics

View File

@ -885,6 +885,7 @@ SelectQueryInfo ReadFromMerge::getModifiedQueryInfo(const ContextMutablePtr & mo
if (modified_query_info.table_expression)
{
auto replacement_table_expression = std::make_shared<TableNode>(storage, storage_lock, storage_snapshot_);
replacement_table_expression->setAlias(modified_query_info.table_expression->getAlias());
if (query_info.table_expression_modifiers)
replacement_table_expression->setTableExpressionModifiers(*query_info.table_expression_modifiers);
@ -1025,7 +1026,7 @@ QueryPipelineBuilderPtr ReadFromMerge::createSources(
const auto & [database_name, storage, _, table_name] = storage_with_lock;
bool allow_experimental_analyzer = context->getSettingsRef().allow_experimental_analyzer;
auto storage_stage
= storage->getQueryProcessingStage(context, QueryProcessingStage::Complete, storage_snapshot_, modified_query_info);
= storage->getQueryProcessingStage(context, processed_stage, storage_snapshot_, modified_query_info);
builder = plan.buildQueryPipeline(
QueryPlanOptimizationSettings::fromContext(context), BuildQueryPipelineSettings::fromContext(context));
@ -1052,6 +1053,47 @@ QueryPipelineBuilderPtr ReadFromMerge::createSources(
Block pipe_header = builder->getHeader();
if (allow_experimental_analyzer)
{
String table_alias = modified_query_info.query_tree->as<QueryNode>()->getJoinTree()->as<TableNode>()->getAlias();
String database_column = table_alias.empty() || processed_stage == QueryProcessingStage::FetchColumns ? "_database" : table_alias + "._database";
String table_column = table_alias.empty() || processed_stage == QueryProcessingStage::FetchColumns ? "_table" : table_alias + "._table";
if (has_database_virtual_column && common_header.has(database_column)
&& (storage_stage == QueryProcessingStage::FetchColumns || !pipe_header.has("'" + database_name + "'_String")))
{
ColumnWithTypeAndName column;
column.name = database_column;
column.type = std::make_shared<DataTypeLowCardinality>(std::make_shared<DataTypeString>());
column.column = column.type->createColumnConst(0, Field(database_name));
auto adding_column_dag = ActionsDAG::makeAddingColumnActions(std::move(column));
auto adding_column_actions = std::make_shared<ExpressionActions>(
std::move(adding_column_dag), ExpressionActionsSettings::fromContext(context, CompileExpressions::yes));
builder->addSimpleTransform([&](const Block & stream_header)
{ return std::make_shared<ExpressionTransform>(stream_header, adding_column_actions); });
}
if (has_table_virtual_column && common_header.has(table_column)
&& (storage_stage == QueryProcessingStage::FetchColumns || !pipe_header.has("'" + table_name + "'_String")))
{
ColumnWithTypeAndName column;
column.name = table_column;
column.type = std::make_shared<DataTypeLowCardinality>(std::make_shared<DataTypeString>());
column.column = column.type->createColumnConst(0, Field(table_name));
auto adding_column_dag = ActionsDAG::makeAddingColumnActions(std::move(column));
auto adding_column_actions = std::make_shared<ExpressionActions>(
std::move(adding_column_dag), ExpressionActionsSettings::fromContext(context, CompileExpressions::yes));
builder->addSimpleTransform([&](const Block & stream_header)
{ return std::make_shared<ExpressionTransform>(stream_header, adding_column_actions); });
}
}
else
{
if (has_database_virtual_column && common_header.has("_database") && !pipe_header.has("_database"))
{
ColumnWithTypeAndName column;
@ -1062,7 +1104,6 @@ QueryPipelineBuilderPtr ReadFromMerge::createSources(
auto adding_column_dag = ActionsDAG::makeAddingColumnActions(std::move(column));
auto adding_column_actions = std::make_shared<ExpressionActions>(
std::move(adding_column_dag), ExpressionActionsSettings::fromContext(context, CompileExpressions::yes));
builder->addSimpleTransform([&](const Block & stream_header)
{ return std::make_shared<ExpressionTransform>(stream_header, adding_column_actions); });
}
@ -1077,15 +1118,15 @@ QueryPipelineBuilderPtr ReadFromMerge::createSources(
auto adding_column_dag = ActionsDAG::makeAddingColumnActions(std::move(column));
auto adding_column_actions = std::make_shared<ExpressionActions>(
std::move(adding_column_dag), ExpressionActionsSettings::fromContext(context, CompileExpressions::yes));
builder->addSimpleTransform([&](const Block & stream_header)
{ return std::make_shared<ExpressionTransform>(stream_header, adding_column_actions); });
}
}
/// Subordinary tables could have different but convertible types, like numeric types of different width.
/// We must return streams with structure equals to structure of Merge table.
convertAndFilterSourceStream(
header, modified_query_info, storage_snapshot_, aliases, row_policy_data_opt, context, *builder, processed_stage);
header, modified_query_info, storage_snapshot_, aliases, row_policy_data_opt, context, *builder, storage_stage);
}
return builder;
@ -1116,13 +1157,13 @@ QueryPlan ReadFromMerge::createPlanForTable(
bool allow_experimental_analyzer = modified_context->getSettingsRef().allow_experimental_analyzer;
auto storage_stage = storage->getQueryProcessingStage(modified_context,
QueryProcessingStage::Complete,
processed_stage,
storage_snapshot_,
modified_query_info);
QueryPlan plan;
if (processed_stage <= storage_stage || (allow_experimental_analyzer && processed_stage == QueryProcessingStage::FetchColumns))
if (processed_stage <= storage_stage)
{
/// If there are only virtual columns in query, you must request at least one other column.
if (real_column_names.empty())
@ -1167,7 +1208,7 @@ QueryPlan ReadFromMerge::createPlanForTable(
row_policy_data_opt->addStorageFilter(source_step_with_filter);
}
}
else if (processed_stage > storage_stage || (allow_experimental_analyzer && processed_stage != QueryProcessingStage::FetchColumns))
else if (processed_stage > storage_stage || allow_experimental_analyzer)
{
/// Maximum permissible parallelism is streams_num
modified_context->setSetting("max_threads", streams_num);

View File

@ -1479,8 +1479,11 @@ UInt64 StorageMergeTree::getCurrentMutationVersion(
size_t StorageMergeTree::clearOldMutations(bool truncate)
{
size_t finished_mutations_to_keep = truncate ? 0 : getSettings()->finished_mutations_to_keep;
size_t finished_mutations_to_keep = getSettings()->finished_mutations_to_keep;
if (!truncate && !finished_mutations_to_keep)
return 0;
finished_mutations_to_keep = truncate ? 0 : finished_mutations_to_keep;
std::vector<MergeTreeMutationEntry> mutations_to_delete;
{
std::lock_guard lock(currently_processing_in_background_mutex);
@ -1899,8 +1902,6 @@ void StorageMergeTree::dropPart(const String & part_name, bool detach, ContextPt
}
}
/// Old part objects is needed to be destroyed before clearing them from filesystem.
clearOldMutations(true);
clearOldPartsFromFilesystem();
clearEmptyParts();
}
@ -1985,8 +1986,6 @@ void StorageMergeTree::dropPartition(const ASTPtr & partition, bool detach, Cont
}
}
/// Old parts are needed to be destroyed before clearing them from filesystem.
clearOldMutations(true);
clearOldPartsFromFilesystem();
clearEmptyParts();
}

View File

@ -4,27 +4,28 @@
#if USE_AWS_S3
#include <Core/Types.h>
#include <Compression/CompressionInfo.h>
#include <Storages/IStorage.h>
#include <Storages/StorageS3Settings.h>
#include <Processors/SourceWithKeyCondition.h>
#include <Processors/Executors/PullingPipelineExecutor.h>
#include <Processors/Formats/IInputFormat.h>
#include <Poco/URI.h>
#include <IO/S3/getObjectInfo.h>
#include <Core/Types.h>
#include <IO/CompressionMethod.h>
#include <IO/S3/BlobStorageLogWriter.h>
#include <IO/S3/getObjectInfo.h>
#include <IO/SeekableReadBuffer.h>
#include <Interpreters/Context.h>
#include <Common/threadPoolCallbackRunner.h>
#include <Processors/Executors/PullingPipelineExecutor.h>
#include <Processors/Formats/IInputFormat.h>
#include <Processors/SourceWithKeyCondition.h>
#include <Storages/Cache/SchemaCache.h>
#include <Storages/IStorage.h>
#include <Storages/SelectQueryInfo.h>
#include <Storages/StorageConfiguration.h>
#include <Storages/StorageS3Settings.h>
#include <Storages/prepareReadingFromFormat.h>
#include <IO/S3/BlobStorageLogWriter.h>
#include <Poco/URI.h>
#include <Common/threadPoolCallbackRunner.h>
#include <filesystem>
namespace fs = std::filesystem;
namespace DB
{

View File

@ -1,5 +1,4 @@
00223_shard_distributed_aggregation_memory_efficient
00717_merge_and_distributed
00725_memory_tracking
01062_pm_all_join_with_block_continuation
01083_expressions_in_engine_arguments

View File

@ -19,10 +19,10 @@ node_no_backoff = cluster.add_instance(
with_zookeeper=True,
)
REPLICATED_POSPONE_MUTATION_LOG = (
REPLICATED_POSTPONE_MUTATION_LOG = (
"According to exponential backoff policy, put aside this log entry"
)
POSPONE_MUTATION_LOG = (
POSTPONE_MUTATION_LOG = (
"According to exponential backoff policy, do not perform mutations for the part"
)
@ -56,21 +56,36 @@ def started_cluster():
@pytest.mark.parametrize(
("node"),
("node, found_in_log"),
[
(node_with_backoff),
(
node_with_backoff,
True,
),
(
node_no_backoff,
False,
),
],
)
def test_exponential_backoff_with_merge_tree(started_cluster, node):
def test_exponential_backoff_with_merge_tree(started_cluster, node, found_in_log):
prepare_cluster(False)
def check_logs():
if found_in_log:
assert node.wait_for_log_line(POSTPONE_MUTATION_LOG)
# Do not rotate the logs when we are checking the absence of a log message
node.rotate_logs()
else:
# Best effort, but when it fails, then the logs for sure contain the problematic message
assert not node.contains_in_log(POSTPONE_MUTATION_LOG)
# Executing incorrect mutation.
node.query(
"ALTER TABLE test_mutations DELETE WHERE x IN (SELECT x FROM notexist_table) SETTINGS allow_nondeterministic_mutations=1"
)
assert node.wait_for_log_line(POSPONE_MUTATION_LOG)
node.rotate_logs()
check_logs()
node.query("KILL MUTATION WHERE table='test_mutations'")
# Check that after kill new parts mutations are postponing.
@ -78,7 +93,7 @@ def test_exponential_backoff_with_merge_tree(started_cluster, node):
"ALTER TABLE test_mutations DELETE WHERE x IN (SELECT x FROM notexist_table) SETTINGS allow_nondeterministic_mutations=1"
)
assert node.wait_for_log_line(POSPONE_MUTATION_LOG)
check_logs()
def test_exponential_backoff_with_replicated_tree(started_cluster):
@ -88,36 +103,37 @@ def test_exponential_backoff_with_replicated_tree(started_cluster):
"ALTER TABLE test_mutations DELETE WHERE x IN (SELECT x FROM notexist_table) SETTINGS allow_nondeterministic_mutations=1"
)
assert node_with_backoff.wait_for_log_line(REPLICATED_POSPONE_MUTATION_LOG)
assert not node_no_backoff.contains_in_log(REPLICATED_POSPONE_MUTATION_LOG)
assert node_with_backoff.wait_for_log_line(REPLICATED_POSTPONE_MUTATION_LOG)
assert not node_no_backoff.contains_in_log(REPLICATED_POSTPONE_MUTATION_LOG)
@pytest.mark.parametrize(
("node"),
[
(node_with_backoff),
],
)
def test_exponential_backoff_create_dependent_table(started_cluster, node):
def test_exponential_backoff_create_dependent_table(started_cluster):
prepare_cluster(False)
# Executing incorrect mutation.
node.query(
node_with_backoff.query(
"ALTER TABLE test_mutations DELETE WHERE x IN (SELECT x FROM dep_table) SETTINGS allow_nondeterministic_mutations=1"
)
# Creating dependent table for mutation.
node.query("CREATE TABLE dep_table(x UInt32) ENGINE MergeTree() ORDER BY x")
node_with_backoff.query(
"CREATE TABLE dep_table(x UInt32) ENGINE MergeTree() ORDER BY x"
)
retry_count = 100
no_unfinished_mutation = False
for _ in range(0, retry_count):
if node.query("SELECT count() FROM system.mutations WHERE is_done=0") == "0\n":
if (
node_with_backoff.query(
"SELECT count() FROM system.mutations WHERE is_done=0"
)
== "0\n"
):
no_unfinished_mutation = True
break
assert no_unfinished_mutation
node.query("DROP TABLE IF EXISTS dep_table SYNC")
node_with_backoff.query("DROP TABLE IF EXISTS dep_table SYNC")
def test_exponential_backoff_setting_override(started_cluster):
@ -133,7 +149,7 @@ def test_exponential_backoff_setting_override(started_cluster):
node.query(
"ALTER TABLE test_mutations DELETE WHERE x IN (SELECT x FROM dep_table) SETTINGS allow_nondeterministic_mutations=1"
)
assert not node.contains_in_log(POSPONE_MUTATION_LOG)
assert not node.contains_in_log(POSTPONE_MUTATION_LOG)
@pytest.mark.parametrize(
@ -152,14 +168,14 @@ def test_backoff_clickhouse_restart(started_cluster, replicated_table):
"ALTER TABLE test_mutations DELETE WHERE x IN (SELECT x FROM dep_table) SETTINGS allow_nondeterministic_mutations=1"
)
assert node.wait_for_log_line(
REPLICATED_POSPONE_MUTATION_LOG if replicated_table else POSPONE_MUTATION_LOG
REPLICATED_POSTPONE_MUTATION_LOG if replicated_table else POSTPONE_MUTATION_LOG
)
node.restart_clickhouse()
node.rotate_logs()
assert node.wait_for_log_line(
REPLICATED_POSPONE_MUTATION_LOG if replicated_table else POSPONE_MUTATION_LOG
REPLICATED_POSTPONE_MUTATION_LOG if replicated_table else POSTPONE_MUTATION_LOG
)
@ -181,7 +197,7 @@ def test_no_backoff_after_killing_mutation(started_cluster, replicated_table):
# Executing correct mutation.
node.query("ALTER TABLE test_mutations DELETE WHERE x=1")
assert node.wait_for_log_line(
REPLICATED_POSPONE_MUTATION_LOG if replicated_table else POSPONE_MUTATION_LOG
REPLICATED_POSTPONE_MUTATION_LOG if replicated_table else POSTPONE_MUTATION_LOG
)
mutation_ids = node.query("select mutation_id from system.mutations").split()
@ -190,5 +206,5 @@ def test_no_backoff_after_killing_mutation(started_cluster, replicated_table):
)
node.rotate_logs()
assert not node.contains_in_log(
REPLICATED_POSPONE_MUTATION_LOG if replicated_table else POSPONE_MUTATION_LOG
REPLICATED_POSTPONE_MUTATION_LOG if replicated_table else POSTPONE_MUTATION_LOG
)

View File

@ -156,15 +156,33 @@ def test_merge_tree_load_parts_corrupted(started_cluster):
node1.query("SYSTEM WAIT LOADING PARTS mt_load_parts_2")
def check_parts_loading(node, partition, loaded, failed, skipped):
# The whole test produces around 6-700 lines, so 2k is plenty enough.
# wait_for_log_line uses tail + grep, so the overhead is negligible
look_behind_lines = 2000
for min_block, max_block in loaded:
part_name = f"{partition}_{min_block}_{max_block}"
assert node.contains_in_log(f"Loading Active part {part_name}")
assert node.contains_in_log(f"Finished loading Active part {part_name}")
assert node.wait_for_log_line(
f"Loading Active part {part_name}", look_behind_lines=look_behind_lines
)
assert node.wait_for_log_line(
f"Finished loading Active part {part_name}",
look_behind_lines=look_behind_lines,
)
failed_part_names = []
# Let's wait until there is some information about all expected parts, and only
# check the absence of not expected log messages after all expected logs are present
for min_block, max_block in failed:
part_name = f"{partition}_{min_block}_{max_block}"
assert node.contains_in_log(f"Loading Active part {part_name}")
assert not node.contains_in_log(f"Finished loading Active part {part_name}")
failed_part_names.append(part_name)
assert node.wait_for_log_line(
f"Loading Active part {part_name}", look_behind_lines=look_behind_lines
)
for failed_part_name in failed_part_names:
assert not node.contains_in_log(
f"Finished loading Active part {failed_part_name}"
)
for min_block, max_block in skipped:
part_name = f"{partition}_{min_block}_{max_block}"

View File

@ -131,14 +131,13 @@ def test_all_projection_files_are_dropped_when_part_is_dropped(
"""
)
objects_empty_table = list_objects(cluster)
node.query(
"ALTER TABLE test_all_projection_files_are_dropped ADD projection b_order (SELECT a, b ORDER BY b)"
)
node.query(
"ALTER TABLE test_all_projection_files_are_dropped MATERIALIZE projection b_order"
)
objects_empty_table = list_objects(cluster)
node.query(
"""

View File

@ -10,8 +10,9 @@ ENGINE = TinyLog;
INSERT INTO simple_key_simple_attributes_source_table VALUES(0, 'value_0', 'value_second_0');
INSERT INTO simple_key_simple_attributes_source_table VALUES(1, 'value_1', 'value_second_1');
INSERT INTO simple_key_simple_attributes_source_table VALUES(2, 'value_2', 'value_second_2');
INSERT INTO simple_key_simple_attributes_source_table SELECT number + 10 as id, concat('value_', toString(id)), concat('value_second_', toString(id)) FROM numbers_mt(1_000_000);
{% for dictionary_config in ['', 'SHARDS 16'] -%}
{% for dictionary_config in ['', 'SHARDS 16 SHARD_LOAD_QUEUE_BACKLOG 2'] -%}
DROP DICTIONARY IF EXISTS hashed_array_dictionary_simple_key_simple_attributes;
CREATE DICTIONARY hashed_array_dictionary_simple_key_simple_attributes
@ -42,7 +43,7 @@ SELECT dictGetOrDefault('hashed_array_dictionary_simple_key_simple_attributes',
SELECT 'dictHas';
SELECT dictHas('hashed_array_dictionary_simple_key_simple_attributes', number) FROM system.numbers LIMIT 4;
SELECT 'select all values as input stream';
SELECT * FROM hashed_array_dictionary_simple_key_simple_attributes ORDER BY id;
SELECT * FROM hashed_array_dictionary_simple_key_simple_attributes ORDER BY id LIMIT 3;
DROP DICTIONARY hashed_array_dictionary_simple_key_simple_attributes;
{% endfor %}
@ -61,8 +62,9 @@ ENGINE = TinyLog;
INSERT INTO simple_key_complex_attributes_source_table VALUES(0, 'value_0', 'value_second_0');
INSERT INTO simple_key_complex_attributes_source_table VALUES(1, 'value_1', NULL);
INSERT INTO simple_key_complex_attributes_source_table VALUES(2, 'value_2', 'value_second_2');
INSERT INTO simple_key_complex_attributes_source_table SELECT number + 10 as id, concat('value_', toString(id)), concat('value_second_', toString(id)) FROM numbers_mt(1_000_000);
{% for dictionary_config in ['', 'SHARDS 16'] -%}
{% for dictionary_config in ['', 'SHARDS 16 SHARD_LOAD_QUEUE_BACKLOG 2'] -%}
DROP DICTIONARY IF EXISTS hashed_array_dictionary_simple_key_complex_attributes;
CREATE DICTIONARY hashed_array_dictionary_simple_key_complex_attributes
@ -92,7 +94,7 @@ SELECT dictGetOrDefault('hashed_array_dictionary_simple_key_complex_attributes',
SELECT 'dictHas';
SELECT dictHas('hashed_array_dictionary_simple_key_complex_attributes', number) FROM system.numbers LIMIT 4;
SELECT 'select all values as input stream';
SELECT * FROM hashed_array_dictionary_simple_key_complex_attributes ORDER BY id;
SELECT * FROM hashed_array_dictionary_simple_key_complex_attributes ORDER BY id LIMIT 3;
DROP DICTIONARY hashed_array_dictionary_simple_key_complex_attributes;

View File

@ -12,7 +12,9 @@ INSERT INTO complex_key_simple_attributes_source_table VALUES(0, 'id_key_0', 'va
INSERT INTO complex_key_simple_attributes_source_table VALUES(1, 'id_key_1', 'value_1', 'value_second_1');
INSERT INTO complex_key_simple_attributes_source_table VALUES(2, 'id_key_2', 'value_2', 'value_second_2');
{% for dictionary_config in ['', 'SHARDS 16'] -%}
INSERT INTO complex_key_simple_attributes_source_table SELECT number + 10 as id, concat('id_key_', toString(id)), toString(id), toString(id) FROM numbers_mt(1_000_000);
{% for dictionary_config in ['', 'SHARDS 16 SHARD_LOAD_QUEUE_BACKLOG 2'] -%}
DROP DICTIONARY IF EXISTS hashed_array_dictionary_complex_key_simple_attributes;
CREATE DICTIONARY hashed_array_dictionary_complex_key_simple_attributes
@ -43,7 +45,7 @@ SELECT dictGetOrDefault('hashed_array_dictionary_complex_key_simple_attributes',
SELECT 'dictHas';
SELECT dictHas('hashed_array_dictionary_complex_key_simple_attributes', (number, concat('id_key_', toString(number)))) FROM system.numbers LIMIT 4;
SELECT 'select all values as input stream';
SELECT * FROM hashed_array_dictionary_complex_key_simple_attributes ORDER BY (id, id_key);
SELECT * FROM hashed_array_dictionary_complex_key_simple_attributes ORDER BY (id, id_key) LIMIT 3;
DROP DICTIONARY hashed_array_dictionary_complex_key_simple_attributes;
@ -64,8 +66,9 @@ ENGINE = TinyLog;
INSERT INTO complex_key_complex_attributes_source_table VALUES(0, 'id_key_0', 'value_0', 'value_second_0');
INSERT INTO complex_key_complex_attributes_source_table VALUES(1, 'id_key_1', 'value_1', NULL);
INSERT INTO complex_key_complex_attributes_source_table VALUES(2, 'id_key_2', 'value_2', 'value_second_2');
INSERT INTO complex_key_complex_attributes_source_table SELECT number + 10 as id, concat('id_key_', toString(id)), toString(id), toString(id) FROM numbers_mt(1_000_000);
{% for dictionary_config in ['', 'SHARDS 16'] -%}
{% for dictionary_config in ['', 'SHARDS 16 SHARD_LOAD_QUEUE_BACKLOG 2'] -%}
DROP DICTIONARY IF EXISTS hashed_array_dictionary_complex_key_complex_attributes;
CREATE DICTIONARY hashed_array_dictionary_complex_key_complex_attributes
@ -97,7 +100,7 @@ SELECT dictGetOrDefault('hashed_array_dictionary_complex_key_complex_attributes'
SELECT 'dictHas';
SELECT dictHas('hashed_array_dictionary_complex_key_complex_attributes', (number, concat('id_key_', toString(number)))) FROM system.numbers LIMIT 4;
SELECT 'select all values as input stream';
SELECT * FROM hashed_array_dictionary_complex_key_complex_attributes ORDER BY (id, id_key);
SELECT * FROM hashed_array_dictionary_complex_key_complex_attributes ORDER BY (id, id_key) LIMIT 3;
{% endfor %}

View File

@ -0,0 +1,19 @@
#!/usr/bin/env bash
# Tags: no-fasttest
CURDIR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)
# shellcheck source=../shell_config.sh
. "$CURDIR"/../shell_config.sh
format=$1
${CLICKHOUSE_CLIENT} --query "DROP TABLE IF EXISTS file"
${CLICKHOUSE_CLIENT} --query "CREATE TABLE file (x UInt64) ENGINE = File($format, '${CLICKHOUSE_DATABASE}/data.$format.lz4')"
for size in 10000 100000 1000000 2500000
do
${CLICKHOUSE_CLIENT} --query "TRUNCATE TABLE file"
${CLICKHOUSE_CLIENT} --query "INSERT INTO file SELECT * FROM numbers($size)"
${CLICKHOUSE_CLIENT} --query "SELECT max(x) FROM file"
done
${CLICKHOUSE_CLIENT} --query "DROP TABLE file"

View File

@ -1,45 +0,0 @@
Native
9999
99999
999999
2499999
Values
9999
99999
999999
2499999
JSONCompactEachRow
9999
99999
999999
2499999
TSKV
9999
99999
999999
2499999
TSV
9999
99999
999999
2499999
CSV
9999
99999
999999
2499999
JSONEachRow
9999
99999
999999
2499999
JSONCompactEachRow
9999
99999
999999
2499999
JSONStringsEachRow
9999
99999
999999
2499999

View File

@ -1,21 +0,0 @@
#!/usr/bin/env bash
# Tags: no-parallel, no-fasttest
CURDIR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)
# shellcheck source=../shell_config.sh
. "$CURDIR"/../shell_config.sh
for format in Native Values JSONCompactEachRow TSKV TSV CSV JSONEachRow JSONCompactEachRow JSONStringsEachRow
do
echo $format
${CLICKHOUSE_CLIENT} --query "DROP TABLE IF EXISTS file"
${CLICKHOUSE_CLIENT} --query "CREATE TABLE file (x UInt64) ENGINE = File($format, '${CLICKHOUSE_DATABASE}/data.$format.lz4')"
for size in 10000 100000 1000000 2500000
do
${CLICKHOUSE_CLIENT} --query "TRUNCATE TABLE file"
${CLICKHOUSE_CLIENT} --query "INSERT INTO file SELECT * FROM numbers($size)"
${CLICKHOUSE_CLIENT} --query "SELECT max(x) FROM file"
done
done
${CLICKHOUSE_CLIENT} --query "DROP TABLE file"

View File

@ -0,0 +1,4 @@
9999
99999
999999
2499999

View File

@ -0,0 +1,6 @@
#!/usr/bin/env bash
# Tags: no-fasttest
CUR_DIR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)
# shellcheck source=../shell_config.sh
${CUR_DIR}/02125_lz4_compression_bug.lib CSV

View File

@ -0,0 +1,4 @@
9999
99999
999999
2499999

View File

@ -0,0 +1,6 @@
#!/usr/bin/env bash
# Tags: no-fasttest
CUR_DIR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)
# shellcheck source=../shell_config.sh
${CUR_DIR}/02125_lz4_compression_bug.lib JSONCompactEachRow

View File

@ -0,0 +1,4 @@
9999
99999
999999
2499999

View File

@ -0,0 +1,6 @@
#!/usr/bin/env bash
# Tags: no-fasttest
CUR_DIR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)
# shellcheck source=../shell_config.sh
${CUR_DIR}/02125_lz4_compression_bug.lib JSONEachRow

View File

@ -0,0 +1,4 @@
9999
99999
999999
2499999

View File

@ -0,0 +1,6 @@
#!/usr/bin/env bash
# Tags: no-fasttest
CUR_DIR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)
# shellcheck source=../shell_config.sh
${CUR_DIR}/02125_lz4_compression_bug.lib JSONStringsEachRow

View File

@ -0,0 +1,4 @@
9999
99999
999999
2499999

View File

@ -0,0 +1,6 @@
#!/usr/bin/env bash
# Tags: no-fasttest
CUR_DIR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)
# shellcheck source=../shell_config.sh
${CUR_DIR}/02125_lz4_compression_bug.lib Native

View File

@ -0,0 +1,4 @@
9999
99999
999999
2499999

View File

@ -0,0 +1,6 @@
#!/usr/bin/env bash
# Tags: no-fasttest
CUR_DIR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)
# shellcheck source=../shell_config.sh
${CUR_DIR}/02125_lz4_compression_bug.lib TSKV

View File

@ -0,0 +1,4 @@
9999
99999
999999
2499999

View File

@ -0,0 +1,6 @@
#!/usr/bin/env bash
# Tags: no-fasttest
CUR_DIR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)
# shellcheck source=../shell_config.sh
${CUR_DIR}/02125_lz4_compression_bug.lib TSV

View File

@ -0,0 +1,4 @@
9999
99999
999999
2499999

View File

@ -0,0 +1,6 @@
#!/usr/bin/env bash
# Tags: no-fasttest
CUR_DIR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)
# shellcheck source=../shell_config.sh
${CUR_DIR}/02125_lz4_compression_bug.lib Values

View File

@ -0,0 +1,2 @@
1
1

View File

@ -0,0 +1,10 @@
#!/usr/bin/env bash
# Tags: no-fasttest
# Tag no-fasttest: requires mysql client
CURDIR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)
# shellcheck source=../shell_config.sh
. "$CURDIR"/../shell_config.sh
${MYSQL_CLIENT} --verbose --execute "COMMIT;" | grep -c "COMMIT"
${MYSQL_CLIENT} --verbose --execute "ROLLBACK;" | grep -c "ROLLBACK"

View File

@ -0,0 +1,4 @@
mutations after ALTER for data_rmt 1
mutations after cleanup for data_rmt 1
mutations after ALTER for data_mt 1
mutations after cleanup for data_mt 1

View File

@ -0,0 +1,14 @@
drop table if exists data_rmt;
drop table if exists data_mt;
create table data_rmt (key Int) engine=ReplicatedMergeTree('/tables/{database}/data', '{table}') order by tuple() settings finished_mutations_to_keep=0, merge_tree_clear_old_parts_interval_seconds=1;
create table data_mt (key Int) engine=MergeTree() order by tuple() settings finished_mutations_to_keep=0, merge_tree_clear_old_parts_interval_seconds=1;
{% for table in ['data_rmt', 'data_mt'] %}
alter table {{table}} delete where 1 settings mutations_sync = 1;
select 'mutations after ALTER for {{table}}', count() from system.mutations where database = currentDatabase() and table = '{{table}}';
-- merge_tree_clear_old_parts_interval_seconds=1, but wait few seconds more
select sleep(5) settings function_sleep_max_microseconds_per_block=10e6 format Null;
select 'mutations after cleanup for {{table}}', count() from system.mutations where database = currentDatabase() and table = '{{table}}';
drop table {{table}};
{% endfor %}

View File

@ -0,0 +1,3 @@
1
2
3

View File

@ -0,0 +1,17 @@
DROP TABLE IF EXISTS v;
CREATE VIEW v (`date` UInt32,`value` UInt8) AS
WITH
data AS (SELECT '' id LIMIT 0),
r AS (SELECT'' as id, 1::UInt8 as value)
SELECT
now() as date,
value AND (data.id IN (SELECT '' as d from system.one)) AS value
FROM data
LEFT JOIN r ON data.id = r.id;
SELECT 1;
SELECT date, value FROM v;
SELECT 2;
SELECT date, value FROM v ORDER BY date;
SELECT 3;
DROP TABLE v;

View File

@ -0,0 +1,3 @@
CREATE TABLE 03013_position_const_start_pos (n Int16) ENGINE = Memory;
INSERT INTO 03013_position_const_start_pos SELECT * FROM generateRandom() LIMIT 1000;
SELECT position(concat(NULLIF(1, 1), materialize(3)), 'ca', 2) FROM 03013_position_const_start_pos FORMAT Null;

View File

@ -0,0 +1,4 @@
000000000000
000000000000
000000000000
000000000000

View File

@ -0,0 +1,4 @@
SELECT repeat(toString(number), toUInt256(12)) FROM numbers(1);
SELECT repeat(toString(number), toUInt128(12)) FROM numbers(1);
SELECT repeat(toString(number), toInt256(12)) FROM numbers(1);
SELECT repeat(toString(number), toInt128(12)) FROM numbers(1);