Merge branch 'master' into keeper-sequential-consistency-reads

This commit is contained in:
mergify[bot] 2022-06-21 08:13:04 +00:00 committed by GitHub
commit bdd5b94192
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
60 changed files with 860 additions and 322 deletions

2
contrib/librdkafka vendored

@ -1 +1 @@
Subproject commit 81b413cc1c2a33ad4e96df856b89184efbd6221c
Subproject commit 6062e711a919fb3b669b243b7dceabd045d0e4a2

View File

@ -0,0 +1,36 @@
#pragma once
#include <base/scope_guard.h>
#include <Common/Exception.h>
#include <Common/LockMemoryExceptionInThread.h>
#define NOEXCEPT_SCOPE_IMPL_CONCAT(n, expected) \
LockMemoryExceptionInThread lock_memory_tracker##n(VariableContext::Global); \
SCOPE_EXIT( \
{ \
const auto uncaught = std::uncaught_exceptions(); \
assert((expected) == uncaught || (expected) + 1 == uncaught); \
if ((expected) < uncaught) \
{ \
tryLogCurrentException("NOEXCEPT_SCOPE"); \
abort(); \
} \
} \
)
#define NOEXCEPT_SCOPE_IMPL(n, expected) NOEXCEPT_SCOPE_IMPL_CONCAT(n, expected)
#define NOEXCEPT_SCOPE_CONCAT(n) \
const auto num_curr_exceptions##n = std::uncaught_exceptions(); \
NOEXCEPT_SCOPE_IMPL(n, num_curr_exceptions##n)
#define NOEXCEPT_SCOPE_FWD(n) NOEXCEPT_SCOPE_CONCAT(n)
/// It can be used in critical places to exit on unexpected exceptions.
/// SIGABRT is usually better that broken in-memory state with unpredictable consequences.
/// It also temporarily disables exception from memory tracker in current thread.
/// Strict version does not take into account nested exception (i.e. it aborts even when we're in catch block).
#define NOEXCEPT_SCOPE_STRICT NOEXCEPT_SCOPE_IMPL(__LINE__, 0)
#define NOEXCEPT_SCOPE NOEXCEPT_SCOPE_FWD(__LINE__)

View File

@ -406,6 +406,7 @@ static constexpr UInt64 operator""_GiB(unsigned long long value)
M(Bool, parallel_view_processing, false, "Enables pushing to attached views concurrently instead of sequentially.", 0) \
M(Bool, enable_unaligned_array_join, false, "Allow ARRAY JOIN with multiple arrays that have different sizes. When this settings is enabled, arrays will be resized to the longest one.", 0) \
M(Bool, optimize_read_in_order, true, "Enable ORDER BY optimization for reading data in corresponding order in MergeTree tables.", 0) \
M(Bool, optimize_read_in_window_order, true, "Enable ORDER BY optimization in window clause for reading data in corresponding order in MergeTree tables.", 0) \
M(Bool, optimize_aggregation_in_order, false, "Enable GROUP BY optimization for aggregating data in corresponding order in MergeTree tables.", 0) \
M(UInt64, aggregation_in_order_max_block_bytes, 50000000, "Maximal size of block in bytes accumulated during aggregation in order of primary key. Lower block size allows to parallelize more final merge stage of aggregation.", 0) \
M(UInt64, read_in_order_two_level_merge_threshold, 100, "Minimal number of parts to read to run preliminary merge step during multithread reading in order of primary key.", 0) \

View File

@ -140,8 +140,11 @@ struct SortCursorImpl
bool isFirst() const { return pos == 0; }
bool isLast() const { return pos + 1 >= rows; }
bool isLast(size_t size) const { return pos + size >= rows; }
bool isValid() const { return pos < rows; }
void next() { ++pos; }
void next(size_t size) { pos += size; }
size_t getSize() const { return rows; }
/// Prevent using pos instead of getRow()
private:
@ -168,6 +171,11 @@ struct SortCursorHelper
return derived().greaterAt(rhs.derived(), impl->getRow(), rhs.impl->getRow());
}
bool ALWAYS_INLINE greaterWithOffset(const SortCursorHelper & rhs, size_t lhs_offset, size_t rhs_offset) const
{
return derived().greaterAt(rhs.derived(), impl->getRow() + lhs_offset, rhs.impl->getRow() + rhs_offset);
}
/// Inverted so that the priority queue elements are removed in ascending order.
bool ALWAYS_INLINE operator< (const SortCursorHelper & rhs) const
{
@ -322,66 +330,126 @@ struct SortCursorWithCollation : SortCursorHelper<SortCursorWithCollation>
}
};
enum class SortingQueueStrategy
{
Default,
Batch
};
/** Allows to fetch data from multiple sort cursors in sorted order (merging sorted data streams).
* TODO: Replace with "Loser Tree", see https://en.wikipedia.org/wiki/K-way_merge_algorithm
*/
template <typename Cursor>
class SortingHeap
/// Allows to fetch data from multiple sort cursors in sorted order (merging sorted data streams).
template <typename Cursor, SortingQueueStrategy strategy>
class SortingQueueImpl
{
public:
SortingHeap() = default;
SortingQueueImpl() = default;
template <typename Cursors>
explicit SortingHeap(Cursors & cursors)
explicit SortingQueueImpl(Cursors & cursors)
{
size_t size = cursors.size();
queue.reserve(size);
for (size_t i = 0; i < size; ++i)
if (!cursors[i].empty())
queue.emplace_back(&cursors[i]);
{
if (cursors[i].empty())
continue;
queue.emplace_back(&cursors[i]);
}
std::make_heap(queue.begin(), queue.end());
if constexpr (strategy == SortingQueueStrategy::Batch)
{
if (!queue.empty())
updateBatchSize();
}
}
bool isValid() const { return !queue.empty(); }
Cursor & current() { return queue.front(); }
Cursor & current() requires (strategy == SortingQueueStrategy::Default)
{
return queue.front();
}
std::pair<Cursor *, size_t> current() requires (strategy == SortingQueueStrategy::Batch)
{
return {&queue.front(), batch_size};
}
size_t size() { return queue.size(); }
Cursor & nextChild() { return queue[nextChildIndex()]; }
void ALWAYS_INLINE next()
void ALWAYS_INLINE next() requires (strategy == SortingQueueStrategy::Default)
{
assert(isValid());
if (!current()->isLast())
if (!queue.front()->isLast())
{
current()->next();
updateTop();
queue.front()->next();
updateTop(true /*check_in_order*/);
}
else
{
removeTop();
}
}
void ALWAYS_INLINE next(size_t batch_size_value) requires (strategy == SortingQueueStrategy::Batch)
{
assert(isValid());
assert(batch_size_value <= batch_size);
assert(batch_size_value > 0);
batch_size -= batch_size_value;
if (batch_size > 0)
{
queue.front()->next(batch_size_value);
return;
}
if (!queue.front()->isLast(batch_size_value))
{
queue.front()->next(batch_size_value);
updateTop(false /*check_in_order*/);
}
else
{
removeTop();
}
}
void replaceTop(Cursor new_top)
{
current() = new_top;
updateTop();
queue.front() = new_top;
updateTop(true /*check_in_order*/);
}
void removeTop()
{
std::pop_heap(queue.begin(), queue.end());
queue.pop_back();
next_idx = 0;
next_child_idx = 0;
if constexpr (strategy == SortingQueueStrategy::Batch)
{
if (queue.empty())
batch_size = 0;
else
updateBatchSize();
}
}
void push(SortCursorImpl & cursor)
{
queue.emplace_back(&cursor);
std::push_heap(queue.begin(), queue.end());
next_idx = 0;
next_child_idx = 0;
if constexpr (strategy == SortingQueueStrategy::Batch)
updateBatchSize();
}
private:
@ -389,26 +457,27 @@ private:
Container queue;
/// Cache comparison between first and second child if the order in queue has not been changed.
size_t next_idx = 0;
size_t next_child_idx = 0;
size_t batch_size = 0;
size_t ALWAYS_INLINE nextChildIndex()
{
if (next_idx == 0)
if (next_child_idx == 0)
{
next_idx = 1;
next_child_idx = 1;
if (queue.size() > 2 && queue[1] < queue[2])
++next_idx;
if (queue.size() > 2 && queue[1].greater(queue[2]))
++next_child_idx;
}
return next_idx;
return next_child_idx;
}
/// This is adapted version of the function __sift_down from libc++.
/// Why cannot simply use std::priority_queue?
/// - because it doesn't support updating the top element and requires pop and push instead.
/// Also look at "Boost.Heap" library.
void ALWAYS_INLINE updateTop()
void ALWAYS_INLINE updateTop(bool check_in_order)
{
size_t size = queue.size();
if (size < 2)
@ -420,10 +489,14 @@ private:
auto child_it = begin + child_idx;
/// Check if we are in order.
if (*child_it < *begin)
if (check_in_order && (*child_it).greater(*begin))
{
if constexpr (strategy == SortingQueueStrategy::Batch)
updateBatchSize();
return;
}
next_idx = 0;
next_child_idx = 0;
auto curr_it = begin;
auto top(std::move(*begin));
@ -441,7 +514,7 @@ private:
child_it = begin + child_idx;
if ((child_idx + 1) < size && *child_it < *(child_it + 1))
if ((child_idx + 1) < size && (*child_it).greater(*(child_it + 1)))
{
/// Right child exists and is greater than left child.
++child_it;
@ -449,13 +522,57 @@ private:
}
/// Check if we are in order.
} while (!(*child_it < top));
} while (!((*child_it).greater(top)));
*curr_it = std::move(top);
if constexpr (strategy == SortingQueueStrategy::Batch)
updateBatchSize();
}
/// Update batch size of elements that client can extract from current cursor
void updateBatchSize()
{
assert(!queue.empty());
auto & begin_cursor = *queue.begin();
size_t min_cursor_size = begin_cursor->getSize();
size_t min_cursor_pos = begin_cursor->getPosRef();
if (queue.size() == 1)
{
batch_size = min_cursor_size - min_cursor_pos;
return;
}
batch_size = 1;
size_t child_idx = nextChildIndex();
auto & next_child_cursor = *(queue.begin() + child_idx);
if (min_cursor_pos + batch_size < min_cursor_size && next_child_cursor.greaterWithOffset(begin_cursor, 0, batch_size))
++batch_size;
else
return;
if (unlikely(begin_cursor.totallyLessOrEquals(next_child_cursor)))
{
batch_size = min_cursor_size - min_cursor_pos;
return;
}
while (min_cursor_pos + batch_size < min_cursor_size && next_child_cursor.greaterWithOffset(begin_cursor, 0, batch_size))
++batch_size;
}
};
template <typename Cursor>
using SortingQueue = SortingQueueImpl<Cursor, SortingQueueStrategy::Default>;
template <typename Cursor>
using SortingQueueBatch = SortingQueueImpl<Cursor, SortingQueueStrategy::Batch>;
/** SortQueueVariants allow to specialize sorting queue for concrete types and sort description.
* To access queue callOnVariant method must be used.
* To access queue variant callOnVariant method must be used.
* To access batch queue variant callOnBatchVariant method must be used.
*/
class SortQueueVariants
{
@ -476,7 +593,7 @@ public:
if (has_collation)
{
queue_variants = SortingHeap<SortCursorWithCollation>();
initializeQueues<SortCursorWithCollation>();
return;
}
else if (sort_description.size() == 1)
@ -491,16 +608,16 @@ public:
using ColumnDataType = typename Types::LeftType;
using ColumnType = typename ColumnDataType::ColumnType;
queue_variants = SortingHeap<SpecializedSingleColumnSortCursor<ColumnType>>();
initializeQueues<SpecializedSingleColumnSortCursor<ColumnType>>();
return true;
});
if (!result)
queue_variants = SortingHeap<SimpleSortCursor>();
initializeQueues<SimpleSortCursor>();
}
else
{
queue_variants = SortingHeap<SortCursor>();
initializeQueues<SortCursor>();
}
}
@ -512,17 +629,30 @@ public:
template <typename Func>
decltype(auto) callOnVariant(Func && func)
{
return std::visit(func, queue_variants);
return std::visit(func, default_queue_variants);
}
template <typename Func>
decltype(auto) callOnBatchVariant(Func && func)
{
return std::visit(func, batch_queue_variants);
}
bool variantSupportJITCompilation() const
{
return std::holds_alternative<SortingHeap<SimpleSortCursor>>(queue_variants)
|| std::holds_alternative<SortingHeap<SortCursor>>(queue_variants)
|| std::holds_alternative<SortingHeap<SortCursorWithCollation>>(queue_variants);
return std::holds_alternative<SortingQueue<SimpleSortCursor>>(default_queue_variants)
|| std::holds_alternative<SortingQueue<SortCursor>>(default_queue_variants)
|| std::holds_alternative<SortingQueue<SortCursorWithCollation>>(default_queue_variants);
}
private:
template <typename Cursor>
void initializeQueues()
{
default_queue_variants = SortingQueue<Cursor>();
batch_queue_variants = SortingQueueBatch<Cursor>();
}
static DataTypes extractSortDescriptionTypesFromHeader(const Block & header, const SortDescription & sort_description)
{
size_t sort_description_size = sort_description.size();
@ -537,39 +667,45 @@ private:
return data_types;
}
std::variant<
SortingHeap<SpecializedSingleColumnSortCursor<ColumnVector<UInt8>>>,
SortingHeap<SpecializedSingleColumnSortCursor<ColumnVector<UInt16>>>,
SortingHeap<SpecializedSingleColumnSortCursor<ColumnVector<UInt32>>>,
SortingHeap<SpecializedSingleColumnSortCursor<ColumnVector<UInt64>>>,
SortingHeap<SpecializedSingleColumnSortCursor<ColumnVector<UInt128>>>,
SortingHeap<SpecializedSingleColumnSortCursor<ColumnVector<UInt256>>>,
template <SortingQueueStrategy strategy>
using QueueVariants = std::variant<
SortingQueueImpl<SpecializedSingleColumnSortCursor<ColumnVector<UInt8>>, strategy>,
SortingQueueImpl<SpecializedSingleColumnSortCursor<ColumnVector<UInt16>>, strategy>,
SortingQueueImpl<SpecializedSingleColumnSortCursor<ColumnVector<UInt32>>, strategy>,
SortingQueueImpl<SpecializedSingleColumnSortCursor<ColumnVector<UInt64>>, strategy>,
SortingQueueImpl<SpecializedSingleColumnSortCursor<ColumnVector<UInt128>>, strategy>,
SortingQueueImpl<SpecializedSingleColumnSortCursor<ColumnVector<UInt256>>, strategy>,
SortingHeap<SpecializedSingleColumnSortCursor<ColumnVector<Int8>>>,
SortingHeap<SpecializedSingleColumnSortCursor<ColumnVector<Int16>>>,
SortingHeap<SpecializedSingleColumnSortCursor<ColumnVector<Int32>>>,
SortingHeap<SpecializedSingleColumnSortCursor<ColumnVector<Int64>>>,
SortingHeap<SpecializedSingleColumnSortCursor<ColumnVector<Int128>>>,
SortingHeap<SpecializedSingleColumnSortCursor<ColumnVector<Int256>>>,
SortingQueueImpl<SpecializedSingleColumnSortCursor<ColumnVector<Int8>>, strategy>,
SortingQueueImpl<SpecializedSingleColumnSortCursor<ColumnVector<Int16>>, strategy>,
SortingQueueImpl<SpecializedSingleColumnSortCursor<ColumnVector<Int32>>, strategy>,
SortingQueueImpl<SpecializedSingleColumnSortCursor<ColumnVector<Int64>>, strategy>,
SortingQueueImpl<SpecializedSingleColumnSortCursor<ColumnVector<Int128>>, strategy>,
SortingQueueImpl<SpecializedSingleColumnSortCursor<ColumnVector<Int256>>, strategy>,
SortingHeap<SpecializedSingleColumnSortCursor<ColumnVector<Float32>>>,
SortingHeap<SpecializedSingleColumnSortCursor<ColumnVector<Float64>>>,
SortingQueueImpl<SpecializedSingleColumnSortCursor<ColumnVector<Float32>>, strategy>,
SortingQueueImpl<SpecializedSingleColumnSortCursor<ColumnVector<Float64>>, strategy>,
SortingHeap<SpecializedSingleColumnSortCursor<ColumnDecimal<Decimal32>>>,
SortingHeap<SpecializedSingleColumnSortCursor<ColumnDecimal<Decimal64>>>,
SortingHeap<SpecializedSingleColumnSortCursor<ColumnDecimal<Decimal128>>>,
SortingHeap<SpecializedSingleColumnSortCursor<ColumnDecimal<Decimal256>>>,
SortingHeap<SpecializedSingleColumnSortCursor<ColumnDecimal<DateTime64>>>,
SortingQueueImpl<SpecializedSingleColumnSortCursor<ColumnDecimal<Decimal32>>, strategy>,
SortingQueueImpl<SpecializedSingleColumnSortCursor<ColumnDecimal<Decimal64>>, strategy>,
SortingQueueImpl<SpecializedSingleColumnSortCursor<ColumnDecimal<Decimal128>>, strategy>,
SortingQueueImpl<SpecializedSingleColumnSortCursor<ColumnDecimal<Decimal256>>, strategy>,
SortingQueueImpl<SpecializedSingleColumnSortCursor<ColumnDecimal<DateTime64>>, strategy>,
SortingHeap<SpecializedSingleColumnSortCursor<ColumnVector<UUID>>>,
SortingQueueImpl<SpecializedSingleColumnSortCursor<ColumnVector<UUID>>, strategy>,
SortingHeap<SpecializedSingleColumnSortCursor<ColumnString>>,
SortingHeap<SpecializedSingleColumnSortCursor<ColumnFixedString>>,
SortingQueueImpl<SpecializedSingleColumnSortCursor<ColumnString>, strategy>,
SortingQueueImpl<SpecializedSingleColumnSortCursor<ColumnFixedString>, strategy>,
SortingHeap<SimpleSortCursor>,
SortingHeap<SortCursor>,
SortingHeap<SortCursorWithCollation>>
queue_variants;
SortingQueueImpl<SimpleSortCursor, strategy>,
SortingQueueImpl<SortCursor, strategy>,
SortingQueueImpl<SortCursorWithCollation, strategy>>;
using DefaultQueueVariants = QueueVariants<SortingQueueStrategy::Default>;
using BatchQueueVariants = QueueVariants<SortingQueueStrategy::Batch>;
DefaultQueueVariants default_queue_variants;
BatchQueueVariants batch_queue_variants;
};
template <typename TLeftColumns, typename TRightColumns>

View File

@ -397,6 +397,7 @@ extern "C" void __sanitizer_set_death_callback(void (*)());
static void sanitizerDeathCallback()
{
DENY_ALLOCATIONS_IN_SCOPE;
/// Also need to send data via pipe. Otherwise it may lead to deadlocks or failures in printing diagnostic info.
char buf[signal_pipe_buf_size];

View File

@ -615,7 +615,6 @@ void ExpressionAnalyzer::getRootActions(const ASTPtr & ast, bool no_makeset_for_
actions = visitor_data.getActions();
}
void ExpressionAnalyzer::getRootActionsNoMakeSet(const ASTPtr & ast, ActionsDAGPtr & actions, bool only_consts)
{
LogAST log;
@ -718,7 +717,7 @@ void ExpressionAnalyzer::makeAggregateDescriptions(ActionsDAGPtr & actions, Aggr
}
}
void makeWindowDescriptionFromAST(const Context & context,
void ExpressionAnalyzer::makeWindowDescriptionFromAST(const Context & context_,
const WindowDescriptions & existing_descriptions,
WindowDescription & desc, const IAST * ast)
{
@ -787,6 +786,10 @@ void makeWindowDescriptionFromAST(const Context & context,
desc.partition_by.push_back(SortColumnDescription(
with_alias->getColumnName(), 1 /* direction */,
1 /* nulls_direction */));
auto actions_dag = std::make_shared<ActionsDAG>(columns_after_join);
getRootActions(column_ast, false, actions_dag);
desc.partition_by_actions.push_back(std::move(actions_dag));
}
}
@ -804,6 +807,10 @@ void makeWindowDescriptionFromAST(const Context & context,
order_by_element.children.front()->getColumnName(),
order_by_element.direction,
order_by_element.nulls_direction));
auto actions_dag = std::make_shared<ActionsDAG>(columns_after_join);
getRootActions(column_ast, false, actions_dag);
desc.order_by_actions.push_back(std::move(actions_dag));
}
}
@ -830,14 +837,14 @@ void makeWindowDescriptionFromAST(const Context & context,
if (definition.frame_end_type == WindowFrame::BoundaryType::Offset)
{
auto [value, _] = evaluateConstantExpression(definition.frame_end_offset,
context.shared_from_this());
context_.shared_from_this());
desc.frame.end_offset = value;
}
if (definition.frame_begin_type == WindowFrame::BoundaryType::Offset)
{
auto [value, _] = evaluateConstantExpression(definition.frame_begin_offset,
context.shared_from_this());
context_.shared_from_this());
desc.frame.begin_offset = value;
}
}

View File

@ -140,6 +140,7 @@ public:
/// A list of windows for window functions.
const WindowDescriptions & windowDescriptions() const { return window_descriptions; }
void makeWindowDescriptionFromAST(const Context & context, const WindowDescriptions & existing_descriptions, WindowDescription & desc, const IAST * ast);
void makeWindowDescriptions(ActionsDAGPtr actions);
/**

View File

@ -886,7 +886,7 @@ static FillColumnDescription getWithFillDescription(const ASTOrderByElement & or
return descr;
}
static SortDescription getSortDescription(const ASTSelectQuery & query, ContextPtr context)
SortDescription InterpreterSelectQuery::getSortDescription(const ASTSelectQuery & query, ContextPtr context_)
{
SortDescription order_descr;
order_descr.reserve(query.orderBy()->children.size());
@ -900,15 +900,15 @@ static SortDescription getSortDescription(const ASTSelectQuery & query, ContextP
collator = std::make_shared<Collator>(order_by_elem.collation->as<ASTLiteral &>().value.get<String>());
if (order_by_elem.with_fill)
{
FillColumnDescription fill_desc = getWithFillDescription(order_by_elem, context);
FillColumnDescription fill_desc = getWithFillDescription(order_by_elem, context_);
order_descr.emplace_back(name, order_by_elem.direction, order_by_elem.nulls_direction, collator, true, fill_desc);
}
else
order_descr.emplace_back(name, order_by_elem.direction, order_by_elem.nulls_direction, collator);
}
order_descr.compile_sort_description = context->getSettingsRef().compile_sort_description;
order_descr.min_count_to_compile_sort_description = context->getSettingsRef().min_count_to_compile_sort_description;
order_descr.compile_sort_description = context_->getSettingsRef().compile_sort_description;
order_descr.min_count_to_compile_sort_description = context_->getSettingsRef().min_count_to_compile_sort_description;
return order_descr;
}
@ -1033,12 +1033,12 @@ static std::pair<UInt64, UInt64> getLimitLengthAndOffset(const ASTSelectQuery &
}
static UInt64 getLimitForSorting(const ASTSelectQuery & query, ContextPtr context)
UInt64 InterpreterSelectQuery::getLimitForSorting(const ASTSelectQuery & query, ContextPtr context_)
{
/// Partial sort can be done if there is LIMIT but no DISTINCT or LIMIT BY, neither ARRAY JOIN.
if (!query.distinct && !query.limitBy() && !query.limit_with_ties && !query.arrayJoinExpressionList().first && query.limitLength())
{
auto [limit_length, limit_offset] = getLimitLengthAndOffset(query, context);
auto [limit_length, limit_offset] = getLimitLengthAndOffset(query, context_);
if (limit_length > std::numeric_limits<UInt64>::max() - limit_offset)
return 0;

View File

@ -128,6 +128,9 @@ public:
/// It will set shard_num and shard_count to the client_info
void setProperClientInfo(size_t replica_num, size_t replica_count);
static SortDescription getSortDescription(const ASTSelectQuery & query, ContextPtr context);
static UInt64 getLimitForSorting(const ASTSelectQuery & query, ContextPtr context);
private:
InterpreterSelectQuery(
const ASTPtr & query_ptr_,

View File

@ -67,6 +67,7 @@ BlockIO InterpreterTransactionControlQuery::executeCommit(ContextMutablePtr sess
if (e.code() == ErrorCodes::UNKNOWN_STATUS_OF_TRANSACTION)
{
/// Detach transaction from current context if connection was lost and its status is unknown
/// (so it will be possible to start new one)
session_context->setCurrentTransaction(NO_TRANSACTION_PTR);
}
throw;
@ -80,6 +81,16 @@ BlockIO InterpreterTransactionControlQuery::executeCommit(ContextMutablePtr sess
/// It's useful for testing. It allows to enable fault injection (after commit) without breaking tests.
txn->waitStateChange(Tx::CommittingCSN);
CSN csn_changed_state = txn->getCSN();
if (csn_changed_state == Tx::UnknownCSN)
{
/// CommittingCSN -> UnknownCSN -> RolledBackCSN
/// It's possible if connection was lost before commit
/// (maybe we should get rid of intermediate UnknownCSN in this transition)
txn->waitStateChange(Tx::UnknownCSN);
chassert(txn->getCSN() == Tx::RolledBackCSN);
}
if (txn->getState() == MergeTreeTransaction::ROLLED_BACK)
throw Exception(ErrorCodes::INVALID_TRANSACTION, "Transaction {} was rolled back", txn->tid);
if (txn->getState() != MergeTreeTransaction::COMMITTED)

View File

@ -3,6 +3,7 @@
#include <Storages/MergeTree/MergeTreeData.h>
#include <Interpreters/TransactionLog.h>
#include <Interpreters/TransactionsInfoLog.h>
#include <Common/noexcept_scope.h>
namespace DB
{
@ -146,8 +147,8 @@ void MergeTreeTransaction::removeOldPart(const StoragePtr & storage, const DataP
std::lock_guard lock{mutex};
checkIsNotCancelled();
LockMemoryExceptionInThread lock_memory_tracker(VariableContext::Global);
part_to_remove->version.lockRemovalTID(tid, context);
NOEXCEPT_SCOPE;
storages.insert(storage);
if (maybe_lock)
table_read_locks_for_ordinary_db.emplace_back(std::move(maybe_lock));

View File

@ -16,7 +16,7 @@
#include <Common/SensitiveDataMasker.h>
#include <Common/ThreadProfileEvents.h>
#include <Common/setThreadName.h>
#include <Common/LockMemoryExceptionInThread.h>
#include <Common/noexcept_scope.h>
#include <base/errnoToString.h>
#if defined(OS_LINUX)
@ -343,7 +343,7 @@ void ThreadStatus::finalizeQueryProfiler()
void ThreadStatus::detachQuery(bool exit_if_already_detached, bool thread_exits)
{
LockMemoryExceptionInThread lock(VariableContext::Global);
NOEXCEPT_SCOPE;
if (exit_if_already_detached && thread_state == ThreadState::DetachedFromQuery)
{

View File

@ -9,12 +9,9 @@
#include <Common/ZooKeeper/KeeperException.h>
#include <Core/ServerUUID.h>
#include <Common/logger_useful.h>
#include <Common/noexcept_scope.h>
/// It's used in critical places to exit on unexpected exceptions.
/// SIGABRT is usually better that broken state in memory with unpredictable consequences.
#define NOEXCEPT_SCOPE SCOPE_EXIT({ if (std::uncaught_exceptions()) { tryLogCurrentException("NOEXCEPT_SCOPE"); abort(); } })
namespace DB
{
@ -146,8 +143,7 @@ void TransactionLog::loadEntries(Strings::const_iterator beg, Strings::const_ite
}
futures.clear();
NOEXCEPT_SCOPE;
LockMemoryExceptionInThread lock_memory_tracker(VariableContext::Global);
NOEXCEPT_SCOPE_STRICT;
{
std::lock_guard lock{mutex};
for (const auto & entry : loaded)
@ -452,7 +448,7 @@ CSN TransactionLog::commitTransaction(const MergeTreeTransactionPtr & txn, bool
/// Do not allow exceptions between commit point and the and of transaction finalization
/// (otherwise it may stuck in COMMITTING state holding snapshot).
NOEXCEPT_SCOPE;
NOEXCEPT_SCOPE_STRICT;
/// FIXME Transactions: Sequential node numbers in ZooKeeper are Int32, but 31 bit is not enough for production use
/// (overflow is possible in a several weeks/months of active usage)
allocated_csn = deserializeCSN(csn_path_created.substr(zookeeper_path_log.size() + 1));

View File

@ -95,12 +95,8 @@ bool VersionMetadata::tryLockRemovalTID(const TransactionID & tid, const Transac
bool locked = removal_tid_lock.compare_exchange_strong(expected_removal_lock_value, removal_lock_value);
if (!locked)
{
if (tid == Tx::PrehistoricTID && expected_removal_lock_value == Tx::PrehistoricTID.getHash())
{
/// Don't need to lock part for queries without transaction
LOG_TEST(log, "Assuming removal_tid is locked by {}, table: {}, part: {}", tid, context.table.getNameForLogs(), context.part_name);
return true;
}
if (expected_removal_lock_value == removal_lock_value)
throw Exception(ErrorCodes::LOGICAL_ERROR, "Tried to lock part {} for removal second time by {}", context.part_name, tid);
if (locked_by_id)
*locked_by_id = expected_removal_lock_value;

View File

@ -7,6 +7,7 @@
#include <DataTypes/IDataType.h>
#include <Core/Names.h>
#include <Core/Types.h>
#include <Processors/QueryPlan/FilterStep.h>
namespace DB
{
@ -90,6 +91,9 @@ struct WindowDescription
// then by ORDER BY. This field holds this combined sort order.
SortDescription full_sort_description;
std::vector<ActionsDAGPtr> partition_by_actions;
std::vector<ActionsDAGPtr> order_by_actions;
WindowFrame frame;
// The window functions that are calculated for this window.

View File

@ -22,7 +22,7 @@ void IMergingAlgorithmWithDelayedChunk::initializeQueue(Inputs inputs)
header, current_inputs[source_num].chunk.getColumns(), description, source_num, current_inputs[source_num].permutation);
}
queue = SortingHeap<SortCursor>(cursors);
queue = SortingQueue<SortCursor>(cursors);
}
void IMergingAlgorithmWithDelayedChunk::updateCursor(Input & input, size_t source_num)

View File

@ -13,7 +13,7 @@ public:
IMergingAlgorithmWithDelayedChunk(Block header_, size_t num_inputs, SortDescription description_);
protected:
SortingHeap<SortCursor> queue;
SortingQueue<SortCursor> queue;
SortDescription description;
/// Previous row. May refer to last_chunk_sort_columns or row from source_chunks.

View File

@ -43,7 +43,7 @@ void IMergingAlgorithmWithSharedChunks::initialize(Inputs inputs)
source.chunk->sort_columns = cursors[source_num].sort_columns;
}
queue = SortingHeap<SortCursor>(cursors);
queue = SortingQueue<SortCursor>(cursors);
}
void IMergingAlgorithmWithSharedChunks::consume(Input & input, size_t source_num)

View File

@ -36,7 +36,7 @@ protected:
using Sources = std::vector<Source>;
Sources sources;
SortingHeap<SortCursor> queue;
SortingQueue<SortCursor> queue;
/// Used in Vertical merge algorithm to gather non-PK/non-index columns (on next step)
/// If it is not nullptr then it should be populated during execution

View File

@ -38,27 +38,41 @@ public:
sum_blocks_granularity += block_size;
}
void insertFromChunk(Chunk && chunk, size_t limit_rows)
void insertRows(const ColumnRawPtrs & raw_columns, size_t start_index, size_t length, size_t block_size)
{
size_t num_columns = raw_columns.size();
for (size_t i = 0; i < num_columns; ++i)
{
if (length == 1)
columns[i]->insertFrom(*raw_columns[i], start_index);
else
columns[i]->insertRangeFrom(*raw_columns[i], start_index, length);
}
total_merged_rows += length;
merged_rows += length;
sum_blocks_granularity += (block_size * length);
}
void insertChunk(Chunk && chunk, size_t rows_size)
{
if (merged_rows)
throw Exception("Cannot insert to MergedData from Chunk because MergedData is not empty.",
ErrorCodes::LOGICAL_ERROR);
auto num_rows = chunk.getNumRows();
UInt64 num_rows = chunk.getNumRows();
columns = chunk.mutateColumns();
if (limit_rows && num_rows > limit_rows)
if (rows_size < num_rows)
{
num_rows = limit_rows;
size_t pop_size = num_rows - rows_size;
for (auto & column : columns)
column = IColumn::mutate(column->cut(0, num_rows));
column->popBack(pop_size);
}
need_flush = true;
total_merged_rows += num_rows;
merged_rows = num_rows;
/// We don't care about granularity here. Because, for fast-forward optimization, chunk will be moved as-is.
/// sum_blocks_granularity += block_size * num_rows;
total_merged_rows += rows_size;
merged_rows = rows_size;
}
Chunk pull()
@ -107,6 +121,7 @@ public:
UInt64 totalMergedRows() const { return total_merged_rows; }
UInt64 totalChunks() const { return total_chunks; }
UInt64 totalAllocatedBytes() const { return total_allocated_bytes; }
UInt64 maxBlockSize() const { return max_block_size; }
protected:
MutableColumns columns;

View File

@ -7,11 +7,6 @@
namespace DB
{
namespace ErrorCodes
{
extern const int LOGICAL_ERROR;
}
MergingSortedAlgorithm::MergingSortedAlgorithm(
Block header_,
size_t num_inputs,
@ -74,7 +69,7 @@ void MergingSortedAlgorithm::initialize(Inputs inputs)
cursors[source_num] = SortCursorImpl(header, chunk.getColumns(), description, source_num);
}
queue_variants.callOnVariant([&](auto & queue)
queue_variants.callOnBatchVariant([&](auto & queue)
{
using QueueType = std::decay_t<decltype(queue)>;
queue = QueueType(cursors);
@ -87,7 +82,7 @@ void MergingSortedAlgorithm::consume(Input & input, size_t source_num)
current_inputs[source_num].swap(input);
cursors[source_num].reset(current_inputs[source_num].chunk.getColumns(), header);
queue_variants.callOnVariant([&](auto & queue)
queue_variants.callOnBatchVariant([&](auto & queue)
{
queue.push(cursors[source_num]);
});
@ -95,16 +90,17 @@ void MergingSortedAlgorithm::consume(Input & input, size_t source_num)
IMergingAlgorithm::Status MergingSortedAlgorithm::merge()
{
IMergingAlgorithm::Status result = queue_variants.callOnVariant([&](auto & queue)
IMergingAlgorithm::Status result = queue_variants.callOnBatchVariant([&](auto & queue)
{
return mergeImpl(queue);
return mergeBatchImpl(queue);
});
return result;
}
template <typename TSortingHeap>
IMergingAlgorithm::Status MergingSortedAlgorithm::mergeImpl(TSortingHeap & queue)
template <typename TSortingQueue>
IMergingAlgorithm::Status MergingSortedAlgorithm::mergeBatchImpl(TSortingQueue & queue)
{
/// Take rows in required order and put them into `merged_data`, while the rows are no more than `max_block_size`
while (queue.isValid())
@ -112,64 +108,100 @@ IMergingAlgorithm::Status MergingSortedAlgorithm::mergeImpl(TSortingHeap & queue
if (merged_data.hasEnoughRows())
return Status(merged_data.pull());
auto current = queue.current();
auto [current_ptr, initial_batch_size] = queue.current();
auto current = *current_ptr;
if (current.impl->isLast() && current_inputs[current.impl->order].skip_last_row)
bool batch_skip_last_row = false;
if (current.impl->isLast(initial_batch_size) && current_inputs[current.impl->order].skip_last_row)
{
/// Get the next block from the corresponding source, if there is one.
queue.removeTop();
return Status(current.impl->order);
batch_skip_last_row = true;
if (initial_batch_size == 1)
{
/// Get the next block from the corresponding source, if there is one.
queue.removeTop();
return Status(current.impl->order);
}
}
/** And what if the block is totally less or equal than the rest for the current cursor?
* Or is there only one data source left in the queue? Then you can take the entire block on current cursor.
*/
if (current.impl->isFirst()
&& !current_inputs[current.impl->order].skip_last_row /// Ignore optimization if last row should be skipped.
&& (queue.size() == 1
|| (queue.size() >= 2 && current.totallyLessOrEquals(queue.nextChild()))))
{
//std::cerr << "current block is totally less or equals\n";
UInt64 merged_rows = merged_data.mergedRows();
size_t updated_batch_size = initial_batch_size;
/// If there are already data in the current block, we first return it.
/// We'll get here again the next time we call the merge function.
if (merged_rows + updated_batch_size > merged_data.maxBlockSize())
{
batch_skip_last_row = false;
updated_batch_size -= merged_rows + updated_batch_size - merged_data.maxBlockSize();
}
bool limit_reached = false;
if (limit && merged_rows + updated_batch_size > limit)
{
batch_skip_last_row = false;
updated_batch_size -= merged_rows + updated_batch_size - limit;
limit_reached = true;
}
if (unlikely(current.impl->isFirst() && current.impl->isLast(initial_batch_size)))
{
/** This is special optimization if current cursor is totally less than next cursor.
* We want to insert current cursor chunk directly in merged data.
*
* First if merged_data is not empty we need to flush it.
* We will get into the same condition on next mergeBatch call.
*
* Then we can insert chunk directly in merged data.
*/
if (merged_data.mergedRows() != 0)
{
//std::cerr << "merged rows is non-zero\n";
// merged_data.flush();
return Status(merged_data.pull());
size_t source_num = current.impl->order;
size_t insert_rows_size = initial_batch_size - static_cast<size_t>(batch_skip_last_row);
merged_data.insertChunk(std::move(current_inputs[source_num].chunk), insert_rows_size);
current_inputs[source_num].chunk = Chunk();
if (out_row_sources_buf)
{
RowSourcePart row_source(current.impl->order);
for (size_t i = 0; i < insert_rows_size; ++i)
out_row_sources_buf->write(row_source.data);
}
/// Actually, current.impl->order stores source number (i.e. cursors[current.impl->order] == current.impl)
size_t source_num = current.impl->order;
if (limit_reached)
break;
/// We will get the next block from the corresponding source, if there is one.
queue.removeTop();
return insertFromChunk(source_num);
auto result = Status(merged_data.pull(), limit_reached);
if (!limit_reached)
result.required_source = source_num;
return result;
}
//std::cerr << "total_merged_rows: " << total_merged_rows << ", merged_rows: " << merged_rows << "\n";
//std::cerr << "Inserting row\n";
merged_data.insertRow(current->all_columns, current->getRow(), current->rows);
size_t insert_rows_size = updated_batch_size - static_cast<size_t>(batch_skip_last_row);
merged_data.insertRows(current->all_columns, current->getRow(), insert_rows_size, current->rows);
if (out_row_sources_buf)
{
/// Actually, current.impl->order stores source number (i.e. cursors[current.impl->order] == current.impl)
RowSourcePart row_source(current.impl->order);
out_row_sources_buf->write(row_source.data);
for (size_t i = 0; i < insert_rows_size; ++i)
out_row_sources_buf->write(row_source.data);
}
if (limit && merged_data.totalMergedRows() >= limit)
return Status(merged_data.pull(), true);
if (limit_reached)
break;
if (!current->isLast())
if (!current->isLast(updated_batch_size))
{
//std::cerr << "moving to next row\n";
queue.next();
queue.next(updated_batch_size);
}
else
{
/// We will get the next block from the corresponding source, if there is one.
queue.removeTop();
//std::cerr << "It was last row, fetching next block\n";
return Status(current.impl->order);
}
}
@ -177,43 +209,4 @@ IMergingAlgorithm::Status MergingSortedAlgorithm::mergeImpl(TSortingHeap & queue
return Status(merged_data.pull(), true);
}
IMergingAlgorithm::Status MergingSortedAlgorithm::insertFromChunk(size_t source_num)
{
if (source_num >= cursors.size())
throw Exception("Logical error in MergingSortedTransform", ErrorCodes::LOGICAL_ERROR);
//std::cerr << "copied columns\n";
auto num_rows = current_inputs[source_num].chunk.getNumRows();
UInt64 total_merged_rows_after_insertion = merged_data.mergedRows() + num_rows;
bool is_finished = limit && total_merged_rows_after_insertion >= limit;
if (limit && total_merged_rows_after_insertion > limit)
{
num_rows -= total_merged_rows_after_insertion - limit;
merged_data.insertFromChunk(std::move(current_inputs[source_num].chunk), num_rows);
}
else
merged_data.insertFromChunk(std::move(current_inputs[source_num].chunk), 0);
current_inputs[source_num].chunk = Chunk();
/// Write order of rows for other columns
/// this data will be used in gather stream
if (out_row_sources_buf)
{
RowSourcePart row_source(source_num);
for (size_t i = 0; i < num_rows; ++i)
out_row_sources_buf->write(row_source.data);
}
auto status = Status(merged_data.pull(), is_finished);
if (!is_finished)
status.required_source = source_num;
return status;
}
}

View File

@ -51,10 +51,9 @@ private:
SortQueueVariants queue_variants;
Status insertFromChunk(size_t source_num);
template <typename TSortingQueue>
Status mergeBatchImpl(TSortingQueue & queue);
template <typename TSortingHeap>
Status mergeImpl(TSortingHeap & queue);
};
}

View File

@ -48,15 +48,20 @@ size_t tryPushDownFilter(QueryPlan::Node * parent_node, QueryPlan::Nodes & nodes
/// May split ExpressionStep and lift up only a part of it.
size_t tryExecuteFunctionsAfterSorting(QueryPlan::Node * parent_node, QueryPlan::Nodes & nodes);
/// Utilize storage sorting when sorting for window functions.
/// Update information about prefix sort description in SortingStep.
size_t tryReuseStorageOrderingForWindowFunctions(QueryPlan::Node * parent_node, QueryPlan::Nodes & nodes);
inline const auto & getOptimizations()
{
static const std::array<Optimization, 6> optimizations = {{
static const std::array<Optimization, 7> optimizations = {{
{tryLiftUpArrayJoin, "liftUpArrayJoin", &QueryPlanOptimizationSettings::optimize_plan},
{tryPushDownLimit, "pushDownLimit", &QueryPlanOptimizationSettings::optimize_plan},
{trySplitFilter, "splitFilter", &QueryPlanOptimizationSettings::optimize_plan},
{tryMergeExpressions, "mergeExpressions", &QueryPlanOptimizationSettings::optimize_plan},
{tryPushDownFilter, "pushDownFilter", &QueryPlanOptimizationSettings::filter_push_down},
{tryExecuteFunctionsAfterSorting, "liftUpFunctions", &QueryPlanOptimizationSettings::optimize_plan},
{tryReuseStorageOrderingForWindowFunctions, "reuseStorageOrderingForWindowFunctions", &QueryPlanOptimizationSettings::optimize_plan}
}};
return optimizations;

View File

@ -0,0 +1,113 @@
#include <Parsers/ASTWindowDefinition.h>
#include <Processors/QueryPlan/Optimizations/Optimizations.h>
#include <Processors/QueryPlan/ITransformingStep.h>
#include <Processors/QueryPlan/AggregatingStep.h>
#include <Processors/QueryPlan/ExpressionStep.h>
#include <Processors/QueryPlan/JoinStep.h>
#include <Processors/QueryPlan/ArrayJoinStep.h>
#include <Processors/QueryPlan/CreatingSetsStep.h>
#include <Processors/QueryPlan/CubeStep.h>
#include <Processors/QueryPlan/ReadFromMergeTree.h>
#include <Processors/QueryPlan/SortingStep.h>
#include <Processors/QueryPlan/TotalsHavingStep.h>
#include <Processors/QueryPlan/DistinctStep.h>
#include <Processors/QueryPlan/UnionStep.h>
#include <Processors/QueryPlan/WindowStep.h>
#include <Interpreters/ActionsDAG.h>
#include <Interpreters/ArrayJoinAction.h>
#include <Interpreters/InterpreterSelectQuery.h>
#include <Interpreters/TableJoin.h>
#include <Common/typeid_cast.h>
#include <DataTypes/DataTypeAggregateFunction.h>
#include <Columns/IColumn.h>
namespace DB::QueryPlanOptimizations
{
size_t tryReuseStorageOrderingForWindowFunctions(QueryPlan::Node * parent_node, QueryPlan::Nodes & /*nodes*/)
{
/// Find the following sequence of steps, add InputOrderInfo and apply prefix sort description to
/// SortingStep:
/// WindowStep <- SortingStep <- [Expression] <- [SettingQuotaAndLimits] <- ReadFromMergeTree
auto * window_node = parent_node;
auto * window = typeid_cast<WindowStep *>(window_node->step.get());
if (!window)
return 0;
if (window_node->children.size() != 1)
return 0;
auto * sorting_node = window_node->children.front();
auto * sorting = typeid_cast<SortingStep *>(sorting_node->step.get());
if (!sorting)
return 0;
if (sorting_node->children.size() != 1)
return 0;
auto * possible_read_from_merge_tree_node = sorting_node->children.front();
if (typeid_cast<ExpressionStep *>(possible_read_from_merge_tree_node->step.get()))
{
if (possible_read_from_merge_tree_node->children.size() != 1)
return 0;
possible_read_from_merge_tree_node = possible_read_from_merge_tree_node->children.front();
}
auto * read_from_merge_tree = typeid_cast<ReadFromMergeTree *>(possible_read_from_merge_tree_node->step.get());
if (!read_from_merge_tree)
{
return 0;
}
auto context = read_from_merge_tree->getContext();
if (!context->getSettings().optimize_read_in_window_order)
{
return 0;
}
const auto & query_info = read_from_merge_tree->getQueryInfo();
const auto * select_query = query_info.query->as<ASTSelectQuery>();
ManyExpressionActions order_by_elements_actions;
const auto & window_desc = window->getWindowDescription();
for (const auto & actions_dag : window_desc.partition_by_actions)
{
order_by_elements_actions.emplace_back(
std::make_shared<ExpressionActions>(actions_dag, ExpressionActionsSettings::fromContext(context, CompileExpressions::yes)));
}
for (const auto & actions_dag : window_desc.order_by_actions)
{
order_by_elements_actions.emplace_back(
std::make_shared<ExpressionActions>(actions_dag, ExpressionActionsSettings::fromContext(context, CompileExpressions::yes)));
}
auto order_optimizer = std::make_shared<ReadInOrderOptimizer>(
*select_query,
order_by_elements_actions,
window->getWindowDescription().full_sort_description,
query_info.syntax_analyzer_result);
read_from_merge_tree->setQueryInfoOrderOptimizer(order_optimizer);
/// If we don't have filtration, we can pushdown limit to reading stage for optimizations.
UInt64 limit = (select_query->hasFiltration() || select_query->groupBy()) ? 0 : InterpreterSelectQuery::getLimitForSorting(*select_query, context);
auto order_info = order_optimizer->getInputOrder(
query_info.projection ? query_info.projection->desc->metadata : read_from_merge_tree->getStorageMetadata(),
context,
limit);
if (order_info)
{
read_from_merge_tree->setQueryInfoInputOrderInfo(order_info);
sorting->convertToFinishSorting(order_info->order_key_prefix_descr);
}
return 0;
}
}

View File

@ -982,6 +982,30 @@ MergeTreeDataSelectAnalysisResultPtr ReadFromMergeTree::selectRangesToRead(
return std::make_shared<MergeTreeDataSelectAnalysisResult>(MergeTreeDataSelectAnalysisResult{.result = std::move(result)});
}
void ReadFromMergeTree::setQueryInfoOrderOptimizer(std::shared_ptr<ReadInOrderOptimizer> order_optimizer)
{
if (query_info.projection)
{
query_info.projection->order_optimizer = order_optimizer;
}
else
{
query_info.order_optimizer = order_optimizer;
}
}
void ReadFromMergeTree::setQueryInfoInputOrderInfo(InputOrderInfoPtr order_info)
{
if (query_info.projection)
{
query_info.projection->input_order_info = order_info;
}
else
{
query_info.input_order_info = order_info;
}
}
ReadFromMergeTree::AnalysisResult ReadFromMergeTree::getAnalysisResult() const
{
auto result_ptr = analyzed_result_ptr ? analyzed_result_ptr : selectRangesToRead(prepared_parts);
@ -1065,7 +1089,7 @@ void ReadFromMergeTree::initializePipeline(QueryPipelineBuilder & pipeline, cons
column_names_to_read,
result_projection);
}
else if ((settings.optimize_read_in_order || settings.optimize_aggregation_in_order) && input_order_info)
else if ((settings.optimize_read_in_order || settings.optimize_aggregation_in_order || settings.optimize_read_in_window_order) && input_order_info)
{
pipe = spreadMarkRangesAmongStreamsWithOrder(
std::move(result.parts_with_ranges),

View File

@ -129,6 +129,13 @@ public:
bool sample_factor_column_queried,
Poco::Logger * log);
ContextPtr getContext() const { return context; }
const SelectQueryInfo & getQueryInfo() const { return query_info; }
StorageMetadataPtr getStorageMetadata() const { return metadata_for_reading; }
void setQueryInfoOrderOptimizer(std::shared_ptr<ReadInOrderOptimizer> read_in_order_optimizer);
void setQueryInfoInputOrderInfo(InputOrderInfoPtr order_info);
private:
const MergeTreeReaderSettings reader_settings;

View File

@ -112,6 +112,12 @@ void SortingStep::updateLimit(size_t limit_)
}
}
void SortingStep::convertToFinishSorting(SortDescription prefix_description_)
{
type = Type::FinishSorting;
prefix_description = std::move(prefix_description_);
}
void SortingStep::transformPipeline(QueryPipelineBuilder & pipeline, const BuildQueryPipelineSettings &)
{
if (type == Type::FinishSorting)

View File

@ -54,6 +54,8 @@ public:
SortDescription getSortDescription() const { return result_description; }
void convertToFinishSorting(SortDescription prefix_description);
private:
enum class Type

View File

@ -138,4 +138,9 @@ void WindowStep::describeActions(JSONBuilder::JSONMap & map) const
map.add("Functions", std::move(functions_array));
}
const WindowDescription & WindowStep::getWindowDescription() const
{
return window_description;
}
}

View File

@ -25,6 +25,8 @@ public:
void describeActions(JSONBuilder::JSONMap & map) const override;
void describeActions(FormatSettings & settings) const override;
const WindowDescription & getWindowDescription() const;
private:
WindowDescription window_description;
std::vector<WindowFunctionDescription> window_functions;

View File

@ -26,8 +26,11 @@ MergeSorter::MergeSorter(const Block & header, Chunks chunks_, SortDescription &
: chunks(std::move(chunks_)), description(description_), max_merged_block_size(max_merged_block_size_), limit(limit_), queue_variants(header, description)
{
Chunks nonempty_chunks;
for (auto & chunk : chunks)
size_t chunks_size = chunks.size();
for (size_t chunk_index = 0; chunk_index < chunks_size; ++chunk_index)
{
auto & chunk = chunks[chunk_index];
if (chunk.getNumRows() == 0)
continue;
@ -36,7 +39,7 @@ MergeSorter::MergeSorter(const Block & header, Chunks chunks_, SortDescription &
/// which can be inefficient.
convertToFullIfSparse(chunk);
cursors.emplace_back(header, chunk.getColumns(), description);
cursors.emplace_back(header, chunk.getColumns(), description, chunk_index);
has_collation |= cursors.back().has_collation;
nonempty_chunks.emplace_back(std::move(chunk));
@ -44,7 +47,7 @@ MergeSorter::MergeSorter(const Block & header, Chunks chunks_, SortDescription &
chunks.swap(nonempty_chunks);
queue_variants.callOnVariant([&](auto & queue)
queue_variants.callOnBatchVariant([&](auto & queue)
{
using QueueType = std::decay_t<decltype(queue)>;
queue = QueueType(cursors);
@ -64,17 +67,17 @@ Chunk MergeSorter::read()
return res;
}
Chunk result = queue_variants.callOnVariant([&](auto & queue)
Chunk result = queue_variants.callOnBatchVariant([&](auto & queue)
{
return mergeImpl(queue);
return mergeBatchImpl(queue);
});
return result;
}
template <typename TSortingHeap>
Chunk MergeSorter::mergeImpl(TSortingHeap & queue)
template <typename TSortingQueue>
Chunk MergeSorter::mergeBatchImpl(TSortingQueue & queue)
{
size_t num_columns = chunks[0].getNumColumns();
MutableColumns merged_columns = chunks[0].cloneEmptyColumns();
@ -89,32 +92,46 @@ Chunk MergeSorter::mergeImpl(TSortingHeap & queue)
column->reserve(size_to_reserve);
}
/// TODO: Optimization when a single block left.
/// Take rows from queue in right order and push to 'merged'.
size_t merged_rows = 0;
while (queue.isValid())
{
auto current = queue.current();
auto [current_ptr, batch_size] = queue.current();
auto & current = *current_ptr;
/// Append a row from queue.
if (merged_rows + batch_size > max_merged_block_size)
batch_size -= merged_rows + batch_size - max_merged_block_size;
bool limit_reached = false;
if (limit && total_merged_rows + batch_size > limit)
{
batch_size -= total_merged_rows + batch_size - limit;
limit_reached = true;
}
/// Append rows from queue.
for (size_t i = 0; i < num_columns; ++i)
merged_columns[i]->insertFrom(*current->all_columns[i], current->getRow());
{
if (batch_size == 1)
merged_columns[i]->insertFrom(*current->all_columns[i], current->getRow());
else
merged_columns[i]->insertRangeFrom(*current->all_columns[i], current->getRow(), batch_size);
}
++total_merged_rows;
++merged_rows;
total_merged_rows += batch_size;
merged_rows += batch_size;
/// We don't need more rows because of limit has reached.
if (limit && total_merged_rows == limit)
if (limit_reached)
{
chunks.clear();
break;
}
queue.next();
queue.next(batch_size);
/// It's enough for current output block but we will continue.
if (merged_rows == max_merged_block_size)
if (merged_rows >= max_merged_block_size)
break;
}

View File

@ -32,10 +32,11 @@ private:
bool has_collation = false;
/** Two different cursors are supported - with and without Collation.
* Templates are used (instead of virtual functions in SortCursor) for zero-overhead.
* Templates are used (instead of virtual functions in SortCursor) for zero-overhead.
*/
template <typename TSortingHeap>
Chunk mergeImpl(TSortingHeap & queue);
template <typename TSortingQueue>
Chunk mergeBatchImpl(TSortingQueue & queue);
};

View File

@ -93,29 +93,24 @@ TEST(MergingSortedTest, SimpleBlockSizeTest)
size_t total_rows = 0;
Block block1;
Block block2;
Block block3;
executor.pull(block1);
executor.pull(block2);
executor.pull(block3);
Block tmp_block;
ASSERT_FALSE(executor.pull(tmp_block));
for (const auto & block : {block1, block2, block3})
for (const auto & block : {block1, block2})
total_rows += block.rows();
/**
* First block consists of 1 row from block3 with 21 rows + 2 rows from block2 with 10 rows
* + 5 rows from block 1 with 5 rows granularity
*/
EXPECT_EQ(block1.rows(), 8);
/**
* Combination of 10 and 21 rows blocks
* Second block consists of 8 rows from block2 + 20 rows from block3
*/
EXPECT_EQ(block2.rows(), 14);
/**
* Combination of 10 and 21 rows blocks
*/
EXPECT_EQ(block3.rows(), 14);
EXPECT_EQ(block2.rows(), 28);
EXPECT_EQ(total_rows, 5 + 10 + 21);
}

View File

@ -65,6 +65,7 @@
#include <Common/escapeForFileName.h>
#include <Common/quoteString.h>
#include <Common/typeid_cast.h>
#include <Common/noexcept_scope.h>
#include <Processors/QueryPlan/ReadFromMergeTree.h>
#include <Processors/Formats/IInputFormat.h>
#include <AggregateFunctions/AggregateFunctionCount.h>
@ -2907,16 +2908,18 @@ bool MergeTreeData::renameTempPartAndReplace(
part->renameTo(part_name, true);
auto part_it = data_parts_indexes.insert(part).first;
/// FIXME Transactions: it's not the best place for checking and setting removal_tid,
/// because it's too optimistic. We should lock removal_tid of covered parts at the beginning of operation.
MergeTreeTransaction::addNewPartAndRemoveCovered(shared_from_this(), part, covered_parts, txn);
if (out_transaction)
{
chassert(out_transaction->txn == txn);
out_transaction->precommitted_parts.insert(part);
}
else
{
/// FIXME Transactions: it's not the best place for checking and setting removal_tid,
/// because it's too optimistic. We should lock removal_tid of covered parts at the beginning of operation.
MergeTreeTransaction::addNewPartAndRemoveCovered(shared_from_this(), part, covered_parts, txn);
size_t reduce_bytes = 0;
size_t reduce_rows = 0;
size_t reduce_parts = 0;
@ -3579,6 +3582,13 @@ MergeTreeData::DataPartsVector MergeTreeData::getVisibleDataPartsVectorInPartiti
return getVisibleDataPartsVectorInPartition(local_context->getCurrentTransaction().get(), partition_id);
}
MergeTreeData::DataPartsVector MergeTreeData::getVisibleDataPartsVectorInPartition(
ContextPtr local_context, const String & partition_id, DataPartsLock & lock) const
{
return getVisibleDataPartsVectorInPartition(local_context->getCurrentTransaction().get(), partition_id, &lock);
}
MergeTreeData::DataPartsVector MergeTreeData::getVisibleDataPartsVectorInPartition(
MergeTreeTransaction * txn, const String & partition_id, DataPartsLock * acquired_lock) const
{
@ -4250,7 +4260,7 @@ void MergeTreeData::restorePartFromBackup(std::shared_ptr<RestoredPartsHolder> r
}
String MergeTreeData::getPartitionIDFromQuery(const ASTPtr & ast, ContextPtr local_context) const
String MergeTreeData::getPartitionIDFromQuery(const ASTPtr & ast, ContextPtr local_context, DataPartsLock * acquired_lock) const
{
const auto & partition_ast = ast->as<ASTPartition &>();
@ -4334,7 +4344,7 @@ String MergeTreeData::getPartitionIDFromQuery(const ASTPtr & ast, ContextPtr loc
String partition_id = partition.getID(*this);
{
auto data_parts_lock = lockParts();
auto data_parts_lock = (acquired_lock) ? DataPartsLock() : lockParts();
DataPartPtr existing_part_in_partition = getAnyPartInPartition(partition_id, data_parts_lock);
if (existing_part_in_partition && existing_part_in_partition->partition.value != partition.value)
{
@ -4922,18 +4932,6 @@ void MergeTreeData::Transaction::rollback()
buf << ".";
LOG_DEBUG(data.log, "Undoing transaction.{}", buf.str());
if (!txn)
{
auto lock = data.lockParts();
for (const auto & part : precommitted_parts)
{
DataPartPtr covering_part;
DataPartsVector covered_parts = data.getActivePartsToReplace(part->info, part->name, covering_part, lock);
for (auto & covered : covered_parts)
covered->version.unlockRemovalTID(Tx::PrehistoricTID, TransactionInfoContext{data.getStorageID(), covered->name});
}
}
data.removePartsFromWorkingSet(txn,
DataPartsVector(precommitted_parts.begin(), precommitted_parts.end()),
/* clear_without_timeout = */ true);
@ -4951,6 +4949,18 @@ MergeTreeData::DataPartsVector MergeTreeData::Transaction::commit(MergeTreeData:
auto parts_lock = acquired_parts_lock ? MergeTreeData::DataPartsLock() : data.lockParts();
auto * owing_parts_lock = acquired_parts_lock ? acquired_parts_lock : &parts_lock;
if (txn)
{
for (const DataPartPtr & part : precommitted_parts)
{
DataPartPtr covering_part;
DataPartsVector covered_parts = data.getActivePartsToReplace(part->info, part->name, covering_part, *owing_parts_lock);
MergeTreeTransaction::addNewPartAndRemoveCovered(data.shared_from_this(), part, covered_parts, txn);
}
}
NOEXCEPT_SCOPE;
auto current_time = time(nullptr);
size_t add_bytes = 0;
@ -4974,6 +4984,9 @@ MergeTreeData::DataPartsVector MergeTreeData::Transaction::commit(MergeTreeData:
}
else
{
if (!txn)
MergeTreeTransaction::addNewPartAndRemoveCovered(data.shared_from_this(), part, covered_parts, NO_TRANSACTION_RAW);
total_covered_parts.insert(total_covered_parts.end(), covered_parts.begin(), covered_parts.end());
for (const auto & covered_part : covered_parts)
{

View File

@ -282,6 +282,7 @@ public:
MergeTreeData & data;
MergeTreeTransaction * txn;
DataParts precommitted_parts;
DataParts locked_parts;
void clear() { precommitted_parts.clear(); }
};
@ -501,6 +502,7 @@ public:
/// Returns all parts in specified partition
DataPartsVector getVisibleDataPartsVectorInPartition(MergeTreeTransaction * txn, const String & partition_id, DataPartsLock * acquired_lock = nullptr) const;
DataPartsVector getVisibleDataPartsVectorInPartition(ContextPtr local_context, const String & partition_id, DataPartsLock & lock) const;
DataPartsVector getVisibleDataPartsVectorInPartition(ContextPtr local_context, const String & partition_id) const;
DataPartsVector getVisibleDataPartsVectorInPartitions(ContextPtr local_context, const std::unordered_set<String> & partition_ids) const;
@ -767,7 +769,7 @@ public:
}
/// For ATTACH/DETACH/DROP PARTITION.
String getPartitionIDFromQuery(const ASTPtr & ast, ContextPtr context) const;
String getPartitionIDFromQuery(const ASTPtr & ast, ContextPtr context, DataPartsLock * acquired_lock = nullptr) const;
std::unordered_set<String> getPartitionIDsFromQuery(const ASTs & asts, ContextPtr context) const;
std::set<String> getPartitionIdsAffectedByCommands(const MutationCommands & commands, ContextPtr query_context) const;

View File

@ -4,7 +4,7 @@
namespace DB
{
int ReplicatedMergeTreeAltersSequence::getHeadAlterVersion(std::lock_guard<std::mutex> & /*state_lock*/) const
int ReplicatedMergeTreeAltersSequence::getHeadAlterVersion(std::unique_lock<std::mutex> & /*state_lock*/) const
{
/// If queue empty, than we don't have version
if (!queue_state.empty())
@ -66,7 +66,7 @@ void ReplicatedMergeTreeAltersSequence::finishDataAlter(int alter_version, std::
}
}
bool ReplicatedMergeTreeAltersSequence::canExecuteDataAlter(int alter_version, std::lock_guard<std::mutex> & /*state_lock*/) const
bool ReplicatedMergeTreeAltersSequence::canExecuteDataAlter(int alter_version, std::unique_lock<std::mutex> & /*state_lock*/) const
{
/// Queue maybe empty when we start after server shutdown
/// and have some MUTATE_PART records in queue
@ -80,7 +80,7 @@ bool ReplicatedMergeTreeAltersSequence::canExecuteDataAlter(int alter_version, s
return queue_state.at(alter_version).metadata_finished;
}
bool ReplicatedMergeTreeAltersSequence::canExecuteMetaAlter(int alter_version, std::lock_guard<std::mutex> & /*state_lock*/) const
bool ReplicatedMergeTreeAltersSequence::canExecuteMetaAlter(int alter_version, std::unique_lock<std::mutex> & /*state_lock*/) const
{
assert(!queue_state.empty());

View File

@ -49,13 +49,13 @@ public:
void finishDataAlter(int alter_version, std::lock_guard<std::mutex> & /*state_lock*/);
/// Check that we can execute this data alter. If it's metadata stage finished.
bool canExecuteDataAlter(int alter_version, std::lock_guard<std::mutex> & /*state_lock*/) const;
bool canExecuteDataAlter(int alter_version, std::unique_lock<std::mutex> & /*state_lock*/) const;
/// Check that we can execute metadata alter with version.
bool canExecuteMetaAlter(int alter_version, std::lock_guard<std::mutex> & /*state_lock*/) const;
bool canExecuteMetaAlter(int alter_version, std::unique_lock<std::mutex> & /*state_lock*/) const;
/// Just returns smallest alter version in sequence (first entry)
int getHeadAlterVersion(std::lock_guard<std::mutex> & /*state_lock*/) const;
int getHeadAlterVersion(std::unique_lock<std::mutex> & /*state_lock*/) const;
};
}

View File

@ -1066,8 +1066,9 @@ void ReplicatedMergeTreeQueue::removePartProducingOpsInRange(
}
bool ReplicatedMergeTreeQueue::isNotCoveredByFuturePartsImpl(const LogEntry & entry, const String & new_part_name,
String & out_reason, std::lock_guard<std::mutex> & /* queue_lock */) const
bool ReplicatedMergeTreeQueue::isCoveredByFuturePartsImpl(const LogEntry & entry, const String & new_part_name,
String & out_reason, std::unique_lock<std::mutex> & /* queue_lock */,
std::vector<LogEntryPtr> * covered_entries_to_wait) const
{
/// Let's check if the same part is now being created by another action.
auto entry_for_same_part_it = future_parts.find(new_part_name);
@ -1080,7 +1081,7 @@ bool ReplicatedMergeTreeQueue::isNotCoveredByFuturePartsImpl(const LogEntry & en
entry.znode_name, entry.type, entry.new_part_name,
another_entry.znode_name, another_entry.type, another_entry.new_part_name);
LOG_INFO(log, fmt::runtime(out_reason));
return false;
return true;
/** When the corresponding action is completed, then `isNotCoveredByFuturePart` next time, will succeed,
* and queue element will be processed.
@ -1098,24 +1099,50 @@ bool ReplicatedMergeTreeQueue::isNotCoveredByFuturePartsImpl(const LogEntry & en
{
auto future_part = MergeTreePartInfo::fromPartName(future_part_elem.first, format_version);
if (future_part.contains(result_part))
if (future_part.isDisjoint(result_part))
continue;
/// Parts are not disjoint, so new_part_name either contains or covers future_part.
chassert(future_part.contains(result_part) || result_part.contains(future_part));
/// We cannot execute `entry` (or upgrade its actual_part_name to `new_part_name`)
/// while any covered or covering parts are processed.
/// But we also cannot simply return true and postpone entry processing, because it may lead to kind of livelock.
/// Since queue is processed in multiple threads, it's likely that there will be at least one thread
/// executing faulty entry for some small part, so bigger covering part will never be processed.
/// That's why it's better to wait for covered entry to be executed (does not matter successfully or not)
/// instead of exiting and postponing covering entry.
if (covered_entries_to_wait)
{
out_reason = fmt::format(
"Not executing log entry {} for part {} "
"because it is covered by part {} that is currently executing.",
entry.znode_name, new_part_name, future_part_elem.first);
LOG_TRACE(log, fmt::runtime(out_reason));
return false;
if (entry.znode_name < future_part_elem.second->znode_name)
{
out_reason = fmt::format(
"Not executing log entry {} for part {} "
"because it is not disjoint with part {} that is currently executing and another entry {} is newer.",
entry.znode_name, new_part_name, future_part_elem.first, future_part_elem.second->znode_name);
LOG_TRACE(log, fmt::runtime(out_reason));
return true;
}
covered_entries_to_wait->push_back(future_part_elem.second);
continue;
}
out_reason = fmt::format(
"Not executing log entry {} for part {} "
"because it is not disjoint with part {} that is currently executing.",
entry.znode_name, new_part_name, future_part_elem.first);
LOG_TRACE(log, fmt::runtime(out_reason));
return true;
}
return true;
return false;
}
bool ReplicatedMergeTreeQueue::addFuturePartIfNotCoveredByThem(const String & part_name, LogEntry & entry, String & reject_reason)
{
/// We have found `part_name` on some replica and are going to fetch it instead of covered `entry->new_part_name`.
std::lock_guard lock(state_mutex);
std::unique_lock lock(state_mutex);
if (virtual_parts.getContainingPart(part_name).empty())
{
@ -1137,13 +1164,13 @@ bool ReplicatedMergeTreeQueue::addFuturePartIfNotCoveredByThem(const String & pa
if (drop_ranges.isAffectedByDropRange(part_name, reject_reason))
return false;
if (isNotCoveredByFuturePartsImpl(entry, part_name, reject_reason, lock))
{
CurrentlyExecuting::setActualPartName(entry, part_name, *this, lock);
return true;
}
std::vector<LogEntryPtr> covered_entries_to_wait;
if (isCoveredByFuturePartsImpl(entry, part_name, reject_reason, lock, &covered_entries_to_wait))
return false;
CurrentlyExecuting::setActualPartName(entry, part_name, *this, lock, covered_entries_to_wait);
return true;
return false;
}
@ -1152,13 +1179,15 @@ bool ReplicatedMergeTreeQueue::shouldExecuteLogEntry(
String & out_postpone_reason,
MergeTreeDataMergerMutator & merger_mutator,
MergeTreeData & data,
std::lock_guard<std::mutex> & state_lock) const
std::unique_lock<std::mutex> & state_lock) const
{
/// If our entry produce part which is already covered by
/// some other entry which is currently executing, then we can postpone this entry.
for (const String & new_part_name : entry.getVirtualPartNames(format_version))
{
if (!isNotCoveredByFuturePartsImpl(entry, new_part_name, out_postpone_reason, state_lock))
/// Do not wait for any entries here, because we have only one thread that scheduling queue entries.
/// We can wait in worker threads, but not in scheduler.
if (isCoveredByFuturePartsImpl(entry, new_part_name, out_postpone_reason, state_lock, /* covered_entries_to_wait */ nullptr))
return false;
}
@ -1409,7 +1438,7 @@ Int64 ReplicatedMergeTreeQueue::getCurrentMutationVersion(const String & partiti
ReplicatedMergeTreeQueue::CurrentlyExecuting::CurrentlyExecuting(
const ReplicatedMergeTreeQueue::LogEntryPtr & entry_, ReplicatedMergeTreeQueue & queue_, std::lock_guard<std::mutex> & /* state_lock */)
const ReplicatedMergeTreeQueue::LogEntryPtr & entry_, ReplicatedMergeTreeQueue & queue_, std::unique_lock<std::mutex> & /* state_lock */)
: entry(entry_), queue(queue_)
{
if (entry->type == ReplicatedMergeTreeLogEntry::DROP_RANGE || entry->type == ReplicatedMergeTreeLogEntry::REPLACE_RANGE)
@ -1435,7 +1464,8 @@ void ReplicatedMergeTreeQueue::CurrentlyExecuting::setActualPartName(
ReplicatedMergeTreeQueue::LogEntry & entry,
const String & actual_part_name,
ReplicatedMergeTreeQueue & queue,
std::lock_guard<std::mutex> & /* state_lock */)
std::unique_lock<std::mutex> & state_lock,
std::vector<LogEntryPtr> & covered_entries_to_wait)
{
if (!entry.actual_new_part_name.empty())
throw Exception("Entry actual part isn't empty yet. This is a bug.", ErrorCodes::LOGICAL_ERROR);
@ -1450,6 +1480,15 @@ void ReplicatedMergeTreeQueue::CurrentlyExecuting::setActualPartName(
throw Exception(ErrorCodes::LOGICAL_ERROR, "Attaching already existing future part {}. This is a bug. "
"It happened on attempt to execute {}: {}",
entry.actual_new_part_name, entry.znode_name, entry.toString());
for (LogEntryPtr & covered_entry : covered_entries_to_wait)
{
if (&entry == covered_entry.get())
continue;
LOG_TRACE(queue.log, "Waiting for {} producing {} to finish before executing {} producing not disjoint part {}",
covered_entry->znode_name, covered_entry->new_part_name, entry.znode_name, entry.new_part_name);
covered_entry->execution_complete.wait(state_lock, [&covered_entry] { return !covered_entry->currently_executing; });
}
}
@ -1491,7 +1530,7 @@ ReplicatedMergeTreeQueue::SelectedEntryPtr ReplicatedMergeTreeQueue::selectEntry
{
LogEntryPtr entry;
std::lock_guard lock(state_mutex);
std::unique_lock lock(state_mutex);
for (auto it = queue.begin(); it != queue.end(); ++it)
{

View File

@ -202,17 +202,18 @@ private:
bool shouldExecuteLogEntry(
const LogEntry & entry, String & out_postpone_reason,
MergeTreeDataMergerMutator & merger_mutator, MergeTreeData & data,
std::lock_guard<std::mutex> & state_lock) const;
std::unique_lock<std::mutex> & state_lock) const;
Int64 getCurrentMutationVersionImpl(const String & partition_id, Int64 data_version, std::lock_guard<std::mutex> & /* state_lock */) const;
/** Check that part isn't in currently generating parts and isn't covered by them.
* Should be called under state_mutex.
*/
bool isNotCoveredByFuturePartsImpl(
bool isCoveredByFuturePartsImpl(
const LogEntry & entry,
const String & new_part_name, String & out_reason,
std::lock_guard<std::mutex> & state_lock) const;
std::unique_lock<std::mutex> & state_lock,
std::vector<LogEntryPtr> * covered_entries_to_wait) const;
/// After removing the queue element, update the insertion times in the RAM. Running under state_mutex.
/// Returns information about what times have changed - this information can be passed to updateTimesInZooKeeper.
@ -254,14 +255,15 @@ private:
CurrentlyExecuting(
const ReplicatedMergeTreeQueue::LogEntryPtr & entry_,
ReplicatedMergeTreeQueue & queue_,
std::lock_guard<std::mutex> & state_lock);
std::unique_lock<std::mutex> & state_lock);
/// In case of fetch, we determine actual part during the execution, so we need to update entry. It is called under state_mutex.
static void setActualPartName(
ReplicatedMergeTreeQueue::LogEntry & entry,
const String & actual_part_name,
ReplicatedMergeTreeQueue & queue,
std::lock_guard<std::mutex> & state_lock);
std::unique_lock<std::mutex> & state_lock,
std::vector<LogEntryPtr> & covered_entries_to_wait);
public:
~CurrentlyExecuting();

View File

@ -278,8 +278,9 @@ void StorageMergeTree::truncate(const ASTPtr &, const StorageMetadataPtr &, Cont
/// This protects against "revival" of data for a removed partition after completion of merge.
auto merge_blocker = stopMergesAndWait();
auto parts_to_remove = getVisibleDataPartsVector(local_context);
removePartsFromWorkingSet(local_context->getCurrentTransaction().get(), parts_to_remove, true);
auto data_parts_lock = lockParts();
auto parts_to_remove = getVisibleDataPartsVectorUnlocked(local_context, data_parts_lock);
removePartsFromWorkingSet(local_context->getCurrentTransaction().get(), parts_to_remove, true, data_parts_lock);
LOG_INFO(log, "Removed {} parts.", parts_to_remove.size());
}
@ -1469,16 +1470,17 @@ void StorageMergeTree::dropPartition(const ASTPtr & partition, bool detach, Cont
/// Asks to complete merges and does not allow them to start.
/// This protects against "revival" of data for a removed partition after completion of merge.
auto merge_blocker = stopMergesAndWait();
auto data_parts_lock = lockParts();
const auto * partition_ast = partition->as<ASTPartition>();
if (partition_ast && partition_ast->all)
parts_to_remove = getVisibleDataPartsVector(local_context);
parts_to_remove = getVisibleDataPartsVectorUnlocked(local_context, data_parts_lock);
else
{
String partition_id = getPartitionIDFromQuery(partition, local_context);
parts_to_remove = getVisibleDataPartsVectorInPartition(local_context, partition_id);
String partition_id = getPartitionIDFromQuery(partition, local_context, &data_parts_lock);
parts_to_remove = getVisibleDataPartsVectorInPartition(local_context, partition_id, data_parts_lock);
}
/// TODO should we throw an exception if parts_to_remove is empty?
removePartsFromWorkingSet(local_context->getCurrentTransaction().get(), parts_to_remove, true);
removePartsFromWorkingSet(local_context->getCurrentTransaction().get(), parts_to_remove, true, data_parts_lock);
}
dropPartsImpl(std::move(parts_to_remove), detach);

View File

@ -43,7 +43,9 @@
"A generator, client, and checker for a set test."
[opts]
{:client (CounterClient. nil nil)
:checker (checker/counter)
:checker (checker/compose
{:counter (checker/counter)
:perf (checker/perf)})
:generator (->> (range)
(map (fn [x]
(->> (gen/mix [r add])))))

View File

@ -59,7 +59,8 @@
{:client (QueueClient. nil nil)
:checker (checker/compose
{:total-queue (checker/total-queue)
:timeline (timeline/html)})
:perf (checker/perf)
:timeline (timeline/html)})
:generator (->> (sorted-str-range 50000)
(map (fn [x]
(rand-nth [{:type :invoke, :f :enqueue :value x}
@ -72,6 +73,7 @@
:checker (checker/compose
{:linear (checker/linearizable {:model (model/unordered-queue)
:algorithm :linear})
:perf (checker/perf)
:timeline (timeline/html)})
:generator (->> (sorted-str-range 10000)
(map (fn [x]

View File

@ -55,6 +55,7 @@
(checker/compose
{:linear (checker/linearizable {:model (model/cas-register)
:algorithm :linear})
:perf (checker/perf)
:timeline (timeline/html)}))
:generator (independent/concurrent-generator
10

View File

@ -44,7 +44,9 @@
"A generator, client, and checker for a set test."
[opts]
{:client (SetClient. "/a-set" nil nil)
:checker (checker/set)
:checker (checker/compose
{:set (checker/set)
:perf (checker/perf)})
:generator (->> (range)
(map (fn [x] {:type :invoke, :f :add, :value x})))
:final-generator (gen/once {:type :invoke, :f :read, :value nil})})

View File

@ -36,7 +36,9 @@
"A generator, client, and checker for a set test."
[opts]
{:client (UniqueClient. nil nil)
:checker (checker/unique-ids)
:checker (checker/compose
{:perf (checker/perf)
:unique (checker/unique-ids)})
:generator (->>
(range)
(map (fn [_] {:type :invoke, :f :generate})))})

View File

@ -1,4 +1,4 @@
a b 1
a \N 2
a \N 1
a \N 2
a b 1

View File

@ -8,7 +8,7 @@ INSERT INTO rollup_having VALUES (NULL, NULL);
INSERT INTO rollup_having VALUES ('a', NULL);
INSERT INTO rollup_having VALUES ('a', 'b');
SELECT a, b, count(*) FROM rollup_having GROUP BY a, b WITH ROLLUP HAVING a IS NOT NULL ORDER BY a, b;
SELECT a, b, count(*) FROM rollup_having GROUP BY a, b WITH ROLLUP HAVING a IS NOT NULL and b IS NOT NULL ORDER BY a, b;
SELECT a, b, count(*) as count FROM rollup_having GROUP BY a, b WITH ROLLUP HAVING a IS NOT NULL ORDER BY a, b, count;
SELECT a, b, count(*) as count FROM rollup_having GROUP BY a, b WITH ROLLUP HAVING a IS NOT NULL and b IS NOT NULL ORDER BY a, b, count;
DROP TABLE rollup_having;

View File

@ -18,9 +18,7 @@ function thread_insert()
{
set -e
val=1
trap "STOP_THE_LOOP=1" INT
STOP_THE_LOOP=0
while [[ $STOP_THE_LOOP != 1 ]]; do
while true; do
$CLICKHOUSE_CLIENT --multiquery --query "
BEGIN TRANSACTION;
INSERT INTO src VALUES /* ($val, 1) */ ($val, 1);
@ -93,9 +91,7 @@ function thread_partition_dst_to_src()
function thread_select()
{
set -e
trap "STOP_THE_LOOP=1" INT
STOP_THE_LOOP=0
while [[ $STOP_THE_LOOP != 1 ]]; do
while true; do
$CLICKHOUSE_CLIENT --multiquery --query "
BEGIN TRANSACTION;
-- no duplicates
@ -122,9 +118,10 @@ thread_partition_src_to_dst & PID_3=$!
thread_partition_dst_to_src & PID_4=$!
wait $PID_3 && wait $PID_4
kill -INT $PID_1
kill -INT $PID_2
kill -TERM $PID_1
kill -TERM $PID_2
wait
wait_for_queries_to_finish
$CLICKHOUSE_CLIENT -q "SELECT type, count(n) = countDistinct(n) FROM merge(currentDatabase(), '') GROUP BY type ORDER BY type"
$CLICKHOUSE_CLIENT -q "SELECT DISTINCT arraySort(groupArrayIf(n, type=1)) = arraySort(groupArrayIf(n, type=2)) FROM merge(currentDatabase(), '') GROUP BY _table ORDER BY _table"

View File

@ -50,9 +50,7 @@ function thread_insert_rollback()
function thread_optimize()
{
set -e
trap "STOP_THE_LOOP=1" INT
STOP_THE_LOOP=0
while [[ $STOP_THE_LOOP != 1 ]]; do
while true; do
optimize_query="OPTIMIZE TABLE src"
partition_id=$(( RANDOM % 2 ))
if (( RANDOM % 2 )); then
@ -82,7 +80,6 @@ function thread_optimize()
function thread_select()
{
set -e
trap "exit 0" INT
while true; do
$CLICKHOUSE_CLIENT --multiquery --query "
BEGIN TRANSACTION;
@ -103,9 +100,7 @@ function thread_select()
function thread_select_insert()
{
set -e
trap "STOP_THE_LOOP=1" INT
STOP_THE_LOOP=0
while [[ $STOP_THE_LOOP != 1 ]]; do
while true; do
$CLICKHOUSE_CLIENT --multiquery --query "
BEGIN TRANSACTION;
SELECT throwIf((SELECT count() FROM tmp) != 0) FORMAT Null;
@ -139,12 +134,13 @@ thread_select & PID_7=$!
thread_select_insert & PID_8=$!
wait $PID_1 && wait $PID_2 && wait $PID_3
kill -INT $PID_4
kill -INT $PID_5
kill -INT $PID_6
kill -INT $PID_7
kill -INT $PID_8
kill -TERM $PID_4
kill -TERM $PID_5
kill -TERM $PID_6
kill -TERM $PID_7
kill -TERM $PID_8
wait
wait_for_queries_to_finish
$CLICKHOUSE_CLIENT --multiquery --query "
BEGIN TRANSACTION;

View File

@ -35,9 +35,7 @@ function thread_insert_rollback()
function thread_select()
{
trap "STOP_THE_LOOP=1" INT
STOP_THE_LOOP=0
while [[ $STOP_THE_LOOP != 1 ]]; do
while true; do
# Result of `uniq | wc -l` must be 1 if the first and the last queries got the same result
$CLICKHOUSE_CLIENT --multiquery --query "
BEGIN TRANSACTION;
@ -55,8 +53,9 @@ thread_insert_commit 2 & PID_2=$!
thread_insert_rollback 3 & PID_3=$!
thread_select & PID_4=$!
wait $PID_1 && wait $PID_2 && wait $PID_3
kill -INT $PID_4
kill -TERM $PID_4
wait
wait_for_queries_to_finish
$CLICKHOUSE_CLIENT --multiquery --query "
BEGIN TRANSACTION;

View File

@ -0,0 +1,12 @@
Partial sorting plan
optimize_read_in_window_order=0
Sort description: n ASC, x ASC
optimize_read_in_window_order=1
Prefix sort description: n ASC
Result sort description: n ASC, x ASC
No sorting plan
optimize_read_in_window_order=0
Sort description: n ASC, x ASC
optimize_read_in_window_order=1
Prefix sort description: n ASC, x ASC
Result sort description: n ASC, x ASC

View File

@ -0,0 +1,36 @@
#!/usr/bin/env bash
CURDIR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)
# shellcheck source=../shell_config.sh
. "$CURDIR"/../shell_config.sh
name=test_01655_plan_optimizations_optimize_read_in_window_order
$CLICKHOUSE_CLIENT -q "drop table if exists ${name}"
$CLICKHOUSE_CLIENT -q "drop table if exists ${name}_n"
$CLICKHOUSE_CLIENT -q "drop table if exists ${name}_n_x"
$CLICKHOUSE_CLIENT -q "create table ${name} engine=MergeTree order by tuple() as select toInt64((sin(number)+2)*65535)%10 as n, number as x from numbers_mt(100000)"
$CLICKHOUSE_CLIENT -q "create table ${name}_n engine=MergeTree order by n as select * from ${name} order by n"
$CLICKHOUSE_CLIENT -q "create table ${name}_n_x engine=MergeTree order by (n, x) as select * from ${name} order by n, x"
$CLICKHOUSE_CLIENT -q "optimize table ${name}_n final"
$CLICKHOUSE_CLIENT -q "optimize table ${name}_n_x final"
echo 'Partial sorting plan'
echo ' optimize_read_in_window_order=0'
$CLICKHOUSE_CLIENT -q "explain plan actions=1, description=1 select n, sum(x) OVER (ORDER BY n, x ROWS BETWEEN 100 PRECEDING AND CURRENT ROW) from ${name}_n SETTINGS optimize_read_in_window_order=0" | grep -i "sort description"
echo ' optimize_read_in_window_order=1'
$CLICKHOUSE_CLIENT -q "explain plan actions=1, description=1 select n, sum(x) OVER (ORDER BY n, x ROWS BETWEEN 100 PRECEDING AND CURRENT ROW) from ${name}_n SETTINGS optimize_read_in_window_order=1" | grep -i "sort description"
echo 'No sorting plan'
echo ' optimize_read_in_window_order=0'
$CLICKHOUSE_CLIENT -q "explain plan actions=1, description=1 select n, sum(x) OVER (ORDER BY n, x ROWS BETWEEN 100 PRECEDING AND CURRENT ROW) from ${name}_n_x SETTINGS optimize_read_in_window_order=0" | grep -i "sort description"
echo ' optimize_read_in_window_order=1'
$CLICKHOUSE_CLIENT -q "explain plan actions=1, description=1 select n, sum(x) OVER (ORDER BY n, x ROWS BETWEEN 100 PRECEDING AND CURRENT ROW) from ${name}_n_x SETTINGS optimize_read_in_window_order=1" | grep -i "sort description"
$CLICKHOUSE_CLIENT -q "drop table ${name}"
$CLICKHOUSE_CLIENT -q "drop table ${name}_n"
$CLICKHOUSE_CLIENT -q "drop table ${name}_n_x"

View File

@ -0,0 +1,35 @@
#!/usr/bin/env bash
# Tags: long
CURDIR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)
# shellcheck source=../shell_config.sh
. "$CURDIR"/../shell_config.sh
name=test_01655_plan_optimizations_optimize_read_in_window_order_long
max_memory_usage=20000000
$CLICKHOUSE_CLIENT -q "drop table if exists ${name}"
$CLICKHOUSE_CLIENT -q "drop table if exists ${name}_n"
$CLICKHOUSE_CLIENT -q "drop table if exists ${name}_n_x"
$CLICKHOUSE_CLIENT -q "create table ${name} engine=MergeTree order by tuple() as select toInt64((sin(number)+2)*65535)%500 as n, number as x from numbers_mt(5000000)"
$CLICKHOUSE_CLIENT -q "create table ${name}_n engine=MergeTree order by n as select * from ${name} order by n"
$CLICKHOUSE_CLIENT -q "create table ${name}_n_x engine=MergeTree order by (n, x) as select * from ${name} order by n, x"
$CLICKHOUSE_CLIENT -q "optimize table ${name}_n final"
$CLICKHOUSE_CLIENT -q "optimize table ${name}_n_x final"
$CLICKHOUSE_CLIENT -q "select n, sum(x) OVER (ORDER BY n, x ROWS BETWEEN 100 PRECEDING AND CURRENT ROW) from ${name}_n SETTINGS optimize_read_in_window_order=0, max_memory_usage=$max_memory_usage, max_threads=1 format Null" 2>&1 | grep -F -q "MEMORY_LIMIT_EXCEEDED" && echo 'OK' || echo 'FAIL'
$CLICKHOUSE_CLIENT -q "select n, sum(x) OVER (ORDER BY n, x ROWS BETWEEN 100 PRECEDING AND CURRENT ROW) from ${name}_n SETTINGS optimize_read_in_window_order=1, max_memory_usage=$max_memory_usage, max_threads=1 format Null"
$CLICKHOUSE_CLIENT -q "select n, sum(x) OVER (ORDER BY n, x ROWS BETWEEN 100 PRECEDING AND CURRENT ROW) from ${name}_n_x SETTINGS optimize_read_in_window_order=0, max_memory_usage=$max_memory_usage, max_threads=1 format Null" 2>&1 | grep -F -q "MEMORY_LIMIT_EXCEEDED" && echo 'OK' || echo 'FAIL'
$CLICKHOUSE_CLIENT -q "select n, sum(x) OVER (ORDER BY n, x ROWS BETWEEN 100 PRECEDING AND CURRENT ROW) from ${name}_n_x SETTINGS optimize_read_in_window_order=1, max_memory_usage=$max_memory_usage, max_threads=1 format Null"
$CLICKHOUSE_CLIENT -q "select n, sum(x) OVER (PARTITION BY n ORDER BY x ROWS BETWEEN 100 PRECEDING AND CURRENT ROW) from ${name}_n_x SETTINGS optimize_read_in_window_order=0, max_memory_usage=$max_memory_usage, max_threads=1 format Null" 2>&1 | grep -F -q "MEMORY_LIMIT_EXCEEDED" && echo 'OK' || echo 'FAIL'
$CLICKHOUSE_CLIENT -q "select n, sum(x) OVER (PARTITION BY n ORDER BY x ROWS BETWEEN 100 PRECEDING AND CURRENT ROW) from ${name}_n_x SETTINGS optimize_read_in_window_order=1, max_memory_usage=$max_memory_usage, max_threads=1 format Null"
$CLICKHOUSE_CLIENT -q "select n, sum(x) OVER (PARTITION BY n+x%2 ORDER BY n, x ROWS BETWEEN 100 PRECEDING AND CURRENT ROW) from ${name}_n_x SETTINGS optimize_read_in_window_order=1, max_memory_usage=$max_memory_usage, max_threads=1 format Null" 2>&1 | grep -F -q "MEMORY_LIMIT_EXCEEDED" && echo 'OK' || echo 'FAIL'
$CLICKHOUSE_CLIENT -q "drop table ${name}"
$CLICKHOUSE_CLIENT -q "drop table ${name}_n"
$CLICKHOUSE_CLIENT -q "drop table ${name}_n_x"

View File

@ -47,15 +47,15 @@ all_2_2_0 u Default
174250
======
174250
58413
57920
57917
57920
58413
174250
======
174250
58413
57920
57917
57920
58413
174250
======
508413
57920

View File

@ -47,9 +47,9 @@ SELECT id, u, s FROM remote('127.0.0.{1,2}', currentDatabase(), t_sparse_full) O
SELECT '======';
SELECT sum(u) FROM t_sparse_full GROUP BY id % 3 AS k WITH TOTALS ORDER BY k;
SELECT '======';
SELECT sum(u) FROM t_sparse_full GROUP BY id % 3 AS k WITH ROLLUP ORDER BY k;
SELECT sum(u) AS value FROM t_sparse_full GROUP BY id % 3 AS k WITH ROLLUP ORDER BY value;
SELECT '======';
SELECT sum(u) FROM t_sparse_full GROUP BY id % 3 AS k WITH CUBE ORDER BY k;
SELECT sum(u) AS value FROM t_sparse_full GROUP BY id % 3 AS k WITH CUBE ORDER BY value;
SELECT '======';
SELECT sum(id) FROM t_sparse_full GROUP BY u % 3 AS k ORDER BY k;
SELECT '======';

View File

@ -1,5 +1,5 @@
#!/usr/bin/env bash
# Tags: long, no-parallel, no-backward-compatibility-check
# Tags: disabled
# Tag: no-parallel - to heavy
# Tag: long - to heavy

View File

@ -44,7 +44,7 @@ function check_replication_consistency()
num_tries=0
while [[ $($CLICKHOUSE_CLIENT -q "SELECT count() FROM system.processes WHERE current_database=currentDatabase() AND query LIKE '%$table_name_prefix%'") -ne 1 ]]; do
sleep 0.5;
num_tries=$((num_tries-1))
num_tries=$((num_tries+1))
if [ $num_tries -eq 100 ]; then
$CLICKHOUSE_CLIENT -q "SELECT count() FROM system.processes WHERE current_database=currentDatabase() AND query LIKE '%$table_name_prefix%' FORMAT Vertical"
break

View File

@ -129,3 +129,17 @@ function clickhouse_client_removed_host_parameter()
# bash regex magic is arcane, but version dependant and weak; sed or awk are not really portable.
$(echo "$CLICKHOUSE_CLIENT" | python3 -c "import sys, re; print(re.sub('--host(\s+|=)[^\s]+', '', sys.stdin.read()))") "$@"
}
function wait_for_queries_to_finish()
{
# Wait for all queries to finish (query may still be running if thread is killed by timeout)
num_tries=0
while [[ $($CLICKHOUSE_CLIENT -q "SELECT count() FROM system.processes WHERE current_database=currentDatabase() AND query NOT LIKE '%system.processes%'") -ne 0 ]]; do
sleep 0.5;
num_tries=$((num_tries+1))
if [ $num_tries -eq 20 ]; then
$CLICKHOUSE_CLIENT -q "SELECT count() FROM system.processes WHERE current_database=currentDatabase() AND query NOT LIKE '%system.processes%' FORMAT Vertical"
break
fi
done
}

View File

@ -4,6 +4,7 @@ v22.4.5.9-stable 2022-05-06
v22.4.4.7-stable 2022-04-29
v22.4.3.3-stable 2022-04-26
v22.4.2.1-stable 2022-04-22
v22.3.7.28-lts 2022-06-20
v22.3.6.5-lts 2022-05-06
v22.3.5.5-lts 2022-04-29
v22.3.4.20-lts 2022-04-26

1 v22.6.1.1985-stable 2022-06-16
4 v22.4.4.7-stable 2022-04-29
5 v22.4.3.3-stable 2022-04-26
6 v22.4.2.1-stable 2022-04-22
7 v22.3.7.28-lts 2022-06-20
8 v22.3.6.5-lts 2022-05-06
9 v22.3.5.5-lts 2022-04-29
10 v22.3.4.20-lts 2022-04-26