;
diff --git a/src/Interpreters/InterpreterSelectQuery.cpp b/src/Interpreters/InterpreterSelectQuery.cpp
index d8e10c8a173..ac31588d210 100644
--- a/src/Interpreters/InterpreterSelectQuery.cpp
+++ b/src/Interpreters/InterpreterSelectQuery.cpp
@@ -1350,7 +1350,7 @@ void InterpreterSelectQuery::executeImpl(QueryPlan & query_plan, std::optionalcontext->getTemporaryVolume(),
settings.min_free_disk_space_for_temporary_data);
- sorting_step->setStepDescription("Sort before JOIN");
+ sorting_step->setStepDescription(fmt::format("Sort {} before JOIN", is_right ? "right" : "left"));
plan.addStep(std::move(sorting_step));
};
if (expressions.join->pipelineType() == JoinPipelineType::YShaped)
{
const auto & join_clause = expressions.join->getTableJoin().getOnlyClause();
- add_sorting(query_plan, join_clause.key_names_left);
- add_sorting(*joined_plan, join_clause.key_names_right);
+ add_sorting(query_plan, join_clause.key_names_left, false);
+ add_sorting(*joined_plan, join_clause.key_names_right, true);
}
QueryPlanStepPtr join_step = std::make_unique(
diff --git a/src/Interpreters/PartLog.cpp b/src/Interpreters/PartLog.cpp
index f79be6a67e0..c3152f31808 100644
--- a/src/Interpreters/PartLog.cpp
+++ b/src/Interpreters/PartLog.cpp
@@ -25,17 +25,32 @@ PartLogElement::MergeReasonType PartLogElement::getMergeReasonType(MergeType mer
{
switch (merge_type)
{
- case MergeType::Regular:
- return REGULAR_MERGE;
- case MergeType::TTLDelete:
- return TTL_DELETE_MERGE;
- case MergeType::TTLRecompress:
- return TTL_RECOMPRESS_MERGE;
+ case MergeType::Regular:
+ return REGULAR_MERGE;
+ case MergeType::TTLDelete:
+ return TTL_DELETE_MERGE;
+ case MergeType::TTLRecompress:
+ return TTL_RECOMPRESS_MERGE;
}
throw Exception(ErrorCodes::NOT_IMPLEMENTED, "Unknown MergeType {}", static_cast(merge_type));
}
+PartLogElement::PartMergeAlgorithm PartLogElement::getMergeAlgorithm(MergeAlgorithm merge_algorithm_)
+{
+ switch (merge_algorithm_)
+ {
+ case MergeAlgorithm::Undecided:
+ return UNDECIDED;
+ case MergeAlgorithm::Horizontal:
+ return HORIZONTAL;
+ case MergeAlgorithm::Vertical:
+ return VERTICAL;
+ }
+
+ throw Exception(ErrorCodes::NOT_IMPLEMENTED, "Unknown MergeAlgorithm {}", static_cast(merge_algorithm_));
+}
+
NamesAndTypesList PartLogElement::getNamesAndTypes()
{
auto event_type_datatype = std::make_shared(
@@ -60,12 +75,22 @@ NamesAndTypesList PartLogElement::getNamesAndTypes()
}
);
+ auto merge_algorithm_datatype = std::make_shared(
+ DataTypeEnum8::Values
+ {
+ {"Undecided", static_cast(UNDECIDED)},
+ {"Horizontal", static_cast(HORIZONTAL)},
+ {"Vertical", static_cast(VERTICAL)},
+ }
+ );
+
ColumnsWithTypeAndName columns_with_type_and_name;
return {
{"query_id", std::make_shared()},
{"event_type", std::move(event_type_datatype)},
{"merge_reason", std::move(merge_reason_datatype)},
+ {"merge_algorithm", std::move(merge_algorithm_datatype)},
{"event_date", std::make_shared()},
{"event_time", std::make_shared()},
@@ -104,6 +129,7 @@ void PartLogElement::appendToBlock(MutableColumns & columns) const
columns[i++]->insert(query_id);
columns[i++]->insert(event_type);
columns[i++]->insert(merge_reason);
+ columns[i++]->insert(merge_algorithm);
columns[i++]->insert(DateLUT::instance().toDayNum(event_time).toUnderType());
columns[i++]->insert(event_time);
columns[i++]->insert(event_time_microseconds);
diff --git a/src/Interpreters/PartLog.h b/src/Interpreters/PartLog.h
index 16a7e37ee9d..2ce0dfd76de 100644
--- a/src/Interpreters/PartLog.h
+++ b/src/Interpreters/PartLog.h
@@ -5,6 +5,7 @@
#include
#include
#include
+#include
namespace DB
@@ -22,6 +23,14 @@ struct PartLogElement
MOVE_PART = 6,
};
+ /// Copy of MergeAlgorithm since values are written to disk.
+ enum PartMergeAlgorithm
+ {
+ UNDECIDED = 0,
+ VERTICAL = 1,
+ HORIZONTAL = 2,
+ };
+
enum MergeReasonType
{
/// merge_reason is relevant only for event_type = 'MERGE_PARTS', in other cases it is NOT_A_MERGE
@@ -38,6 +47,7 @@ struct PartLogElement
Type event_type = NEW_PART;
MergeReasonType merge_reason = NOT_A_MERGE;
+ PartMergeAlgorithm merge_algorithm = UNDECIDED;
time_t event_time = 0;
Decimal64 event_time_microseconds = 0;
@@ -72,6 +82,8 @@ struct PartLogElement
static std::string name() { return "PartLog"; }
static MergeReasonType getMergeReasonType(MergeType merge_type);
+ static PartMergeAlgorithm getMergeAlgorithm(MergeAlgorithm merge_algorithm_);
+
static NamesAndTypesList getNamesAndTypes();
static NamesAndAliases getNamesAndAliases() { return {}; }
void appendToBlock(MutableColumns & columns) const;
diff --git a/src/Loggers/OwnSplitChannel.cpp b/src/Loggers/OwnSplitChannel.cpp
index 71be8007d85..355b733b624 100644
--- a/src/Loggers/OwnSplitChannel.cpp
+++ b/src/Loggers/OwnSplitChannel.cpp
@@ -24,7 +24,7 @@ void OwnSplitChannel::log(const Poco::Message & msg)
#ifdef WITH_TEXT_LOG
auto logs_queue = CurrentThread::getInternalTextLogsQueue();
- if (channels.empty() && (logs_queue == nullptr || msg.getPriority() > logs_queue->max_priority))
+ if (channels.empty() && (logs_queue == nullptr || !logs_queue->isNeeded(msg.getPriority(), msg.getSource())))
return;
#endif
@@ -93,7 +93,7 @@ void OwnSplitChannel::logSplit(const Poco::Message & msg)
auto logs_queue = CurrentThread::getInternalTextLogsQueue();
/// Log to "TCP queue" if message is not too noisy
- if (logs_queue && msg.getPriority() <= logs_queue->max_priority)
+ if (logs_queue && logs_queue->isNeeded(msg.getPriority(), msg.getSource()))
{
MutableColumns columns = InternalTextLogsQueue::getSampleColumns();
diff --git a/src/Processors/Executors/ExecutionThreadContext.cpp b/src/Processors/Executors/ExecutionThreadContext.cpp
index 5a5c1826c61..7631cb09f61 100644
--- a/src/Processors/Executors/ExecutionThreadContext.cpp
+++ b/src/Processors/Executors/ExecutionThreadContext.cpp
@@ -71,7 +71,13 @@ static void executeJob(ExecutingGraph::Node * node, ReadProgressCallback * read_
bool ExecutionThreadContext::executeTask()
{
- OpenTelemetrySpanHolder span("ExecutionThreadContext::executeTask() " + node->processor->getName());
+ std::unique_ptr span;
+
+ if (trace_processors)
+ {
+ span = std::make_unique("ExecutionThreadContext::executeTask() " + node->processor->getName());
+ span->addAttribute("thread_number", thread_number);
+ }
std::optional execution_time_watch;
#ifndef NDEBUG
@@ -93,17 +99,16 @@ bool ExecutionThreadContext::executeTask()
if (profile_processors)
{
- UInt64 elapsed_microseconds = execution_time_watch->elapsedMicroseconds();
- node->processor->elapsed_us += elapsed_microseconds;
- span.addAttribute("execution_time_ms", elapsed_microseconds);
- }
+ UInt64 elapsed_microseconds = execution_time_watch->elapsedMicroseconds();
+ node->processor->elapsed_us += elapsed_microseconds;
+ if (trace_processors)
+ span->addAttribute("execution_time_ms", elapsed_microseconds);
+ }
#ifndef NDEBUG
execution_time_ns += execution_time_watch->elapsed();
- span.addAttribute("execution_time_ns", execution_time_watch->elapsed());
+ if (trace_processors)
+ span->addAttribute("execution_time_ns", execution_time_watch->elapsed());
#endif
-
- span.addAttribute("thread_number", thread_number);
-
return node->exception == nullptr;
}
diff --git a/src/Processors/Executors/ExecutionThreadContext.h b/src/Processors/Executors/ExecutionThreadContext.h
index f0341333117..eb048f8ab09 100644
--- a/src/Processors/Executors/ExecutionThreadContext.h
+++ b/src/Processors/Executors/ExecutionThreadContext.h
@@ -41,6 +41,7 @@ public:
const size_t thread_number;
const bool profile_processors;
+ const bool trace_processors;
void wait(std::atomic_bool & finished);
void wakeUp();
@@ -61,10 +62,11 @@ public:
void setException(std::exception_ptr exception_) { exception = exception_; }
void rethrowExceptionIfHas();
- explicit ExecutionThreadContext(size_t thread_number_, bool profile_processors_, ReadProgressCallback * callback)
+ explicit ExecutionThreadContext(size_t thread_number_, bool profile_processors_, bool trace_processors_, ReadProgressCallback * callback)
: read_progress_callback(callback)
, thread_number(thread_number_)
, profile_processors(profile_processors_)
+ , trace_processors(trace_processors_)
{}
};
diff --git a/src/Processors/Executors/ExecutorTasks.cpp b/src/Processors/Executors/ExecutorTasks.cpp
index f2287e467dc..824b4e962d2 100644
--- a/src/Processors/Executors/ExecutorTasks.cpp
+++ b/src/Processors/Executors/ExecutorTasks.cpp
@@ -128,7 +128,7 @@ void ExecutorTasks::pushTasks(Queue & queue, Queue & async_queue, ExecutionThrea
}
}
-void ExecutorTasks::init(size_t num_threads_, bool profile_processors, ReadProgressCallback * callback)
+void ExecutorTasks::init(size_t num_threads_, bool profile_processors, bool trace_processors, ReadProgressCallback * callback)
{
num_threads = num_threads_;
threads_queue.init(num_threads);
@@ -139,7 +139,7 @@ void ExecutorTasks::init(size_t num_threads_, bool profile_processors, ReadProgr
executor_contexts.reserve(num_threads);
for (size_t i = 0; i < num_threads; ++i)
- executor_contexts.emplace_back(std::make_unique(i, profile_processors, callback));
+ executor_contexts.emplace_back(std::make_unique(i, profile_processors, trace_processors, callback));
}
}
diff --git a/src/Processors/Executors/ExecutorTasks.h b/src/Processors/Executors/ExecutorTasks.h
index caff1a35d98..668470e7b11 100644
--- a/src/Processors/Executors/ExecutorTasks.h
+++ b/src/Processors/Executors/ExecutorTasks.h
@@ -54,7 +54,7 @@ public:
void tryGetTask(ExecutionThreadContext & context);
void pushTasks(Queue & queue, Queue & async_queue, ExecutionThreadContext & context);
- void init(size_t num_threads_, bool profile_processors, ReadProgressCallback * callback);
+ void init(size_t num_threads_, bool profile_processors, bool trace_processors, ReadProgressCallback * callback);
void fill(Queue & queue);
void processAsyncTasks();
diff --git a/src/Processors/Executors/PipelineExecutor.cpp b/src/Processors/Executors/PipelineExecutor.cpp
index cccd08b2273..68225d73ff1 100644
--- a/src/Processors/Executors/PipelineExecutor.cpp
+++ b/src/Processors/Executors/PipelineExecutor.cpp
@@ -29,8 +29,10 @@ PipelineExecutor::PipelineExecutor(Processors & processors, QueryStatus * elem)
: process_list_element(elem)
{
if (process_list_element)
+ {
profile_processors = process_list_element->getContext()->getSettingsRef().log_processors_profiles;
-
+ trace_processors = process_list_element->getContext()->getSettingsRef().opentelemetry_trace_processors;
+ }
try
{
graph = std::make_unique(processors, profile_processors);
@@ -268,7 +270,7 @@ void PipelineExecutor::initializeExecution(size_t num_threads)
Queue queue;
graph->initializeExecution(queue);
- tasks.init(num_threads, profile_processors, read_progress_callback.get());
+ tasks.init(num_threads, profile_processors, trace_processors, read_progress_callback.get());
tasks.fill(queue);
}
diff --git a/src/Processors/Executors/PipelineExecutor.h b/src/Processors/Executors/PipelineExecutor.h
index 80ba21a8adf..c4d11ef688d 100644
--- a/src/Processors/Executors/PipelineExecutor.h
+++ b/src/Processors/Executors/PipelineExecutor.h
@@ -65,6 +65,8 @@ private:
bool is_execution_initialized = false;
/// system.processors_profile_log
bool profile_processors = false;
+ /// system.opentelemetry_span_log
+ bool trace_processors = false;
std::atomic_bool cancelled = false;
diff --git a/src/Processors/QueryPlan/JoinStep.cpp b/src/Processors/QueryPlan/JoinStep.cpp
index d7ccf003937..909933fbed2 100644
--- a/src/Processors/QueryPlan/JoinStep.cpp
+++ b/src/Processors/QueryPlan/JoinStep.cpp
@@ -48,18 +48,30 @@ QueryPipelineBuilderPtr JoinStep::updatePipeline(QueryPipelineBuilders pipelines
&processors);
}
+bool JoinStep::allowPushDownToRight() const
+{
+ return join->pipelineType() == JoinPipelineType::YShaped;
+}
+
void JoinStep::describePipeline(FormatSettings & settings) const
{
IQueryPlanStep::describePipeline(processors, settings);
}
-void JoinStep::updateLeftStream(const DataStream & left_stream_)
+void JoinStep::updateInputStream(const DataStream & new_input_stream_, size_t idx)
{
- input_streams = {left_stream_, input_streams.at(1)};
- output_stream = DataStream
+ if (idx == 0)
{
- .header = JoiningTransform::transformHeader(left_stream_.header, join),
- };
+ input_streams = {new_input_stream_, input_streams.at(1)};
+ output_stream = DataStream
+ {
+ .header = JoiningTransform::transformHeader(new_input_stream_.header, join),
+ };
+ }
+ else
+ {
+ input_streams = {input_streams.at(0), new_input_stream_};
+ }
}
static ITransformingStep::Traits getStorageJoinTraits()
diff --git a/src/Processors/QueryPlan/JoinStep.h b/src/Processors/QueryPlan/JoinStep.h
index 1ea0e9b366d..fc7f74d4fe8 100644
--- a/src/Processors/QueryPlan/JoinStep.h
+++ b/src/Processors/QueryPlan/JoinStep.h
@@ -28,8 +28,9 @@ public:
void describePipeline(FormatSettings & settings) const override;
const JoinPtr & getJoin() const { return join; }
+ bool allowPushDownToRight() const;
- void updateLeftStream(const DataStream & left_stream_);
+ void updateInputStream(const DataStream & new_input_stream_, size_t idx);
private:
JoinPtr join;
diff --git a/src/Processors/QueryPlan/Optimizations/filterPushDown.cpp b/src/Processors/QueryPlan/Optimizations/filterPushDown.cpp
index 0c17c27e7aa..680d158ecaf 100644
--- a/src/Processors/QueryPlan/Optimizations/filterPushDown.cpp
+++ b/src/Processors/QueryPlan/Optimizations/filterPushDown.cpp
@@ -1,3 +1,10 @@
+#include
+
+#include
+#include
+
+#include
+
#include
#include
#include
@@ -11,13 +18,10 @@
#include
#include
#include
+
#include
#include
#include
-#include
-#include
-
-#include
namespace DB::ErrorCodes
{
@@ -39,7 +43,8 @@ static bool filterColumnIsNotAmongAggregatesArguments(const AggregateDescription
}
static size_t
-tryAddNewFilterStep(QueryPlan::Node * parent_node, QueryPlan::Nodes & nodes, const Names & allowed_inputs, bool can_remove_filter = true)
+tryAddNewFilterStep(QueryPlan::Node * parent_node, QueryPlan::Nodes & nodes, const Names & allowed_inputs,
+ bool can_remove_filter = true, size_t child_idx = 0)
{
QueryPlan::Node * child_node = parent_node->children.front();
@@ -53,7 +58,11 @@ tryAddNewFilterStep(QueryPlan::Node * parent_node, QueryPlan::Nodes & nodes, con
// std::cerr << "Filter: \n" << expression->dumpDAG() << std::endl;
- const auto & all_inputs = child->getInputStreams().front().header.getColumnsWithTypeAndName();
+ if (child_idx >= child->getInputStreams().size() || child_idx >= child_node->children.size())
+ throw Exception(ErrorCodes::LOGICAL_ERROR, "Child index {} is out of range (streams: {}, children: {})",
+ child_idx, child->getInputStreams().size(), child_node->children.size());
+
+ const auto & all_inputs = child->getInputStreams()[child_idx].header.getColumnsWithTypeAndName();
auto split_filter = expression->cloneActionsForFilterPushDown(filter_column_name, removes_filter, allowed_inputs, all_inputs);
if (!split_filter)
@@ -75,7 +84,8 @@ tryAddNewFilterStep(QueryPlan::Node * parent_node, QueryPlan::Nodes & nodes, con
/// Expression/Filter -> Aggregating -> Something
auto & node = nodes.emplace_back();
node.children.emplace_back(&node);
- std::swap(node.children[0], child_node->children[0]);
+
+ std::swap(node.children[0], child_node->children[child_idx]);
/// Expression/Filter -> Aggregating -> Filter -> Something
/// New filter column is the first one.
@@ -90,7 +100,9 @@ tryAddNewFilterStep(QueryPlan::Node * parent_node, QueryPlan::Nodes & nodes, con
else
{
if (auto * join = typeid_cast(child.get()))
- join->updateLeftStream(node.step->getOutputStream());
+ {
+ join->updateInputStream(node.step->getOutputStream(), child_idx);
+ }
else
throw Exception(
ErrorCodes::LOGICAL_ERROR, "We are trying to push down a filter through a step for which we cannot update input stream");
@@ -208,25 +220,29 @@ size_t tryPushDownFilter(QueryPlan::Node * parent_node, QueryPlan::Nodes & nodes
if (auto * join = typeid_cast(child.get()))
{
- const auto & table_join = join->getJoin()->getTableJoin();
- /// Push down is for left table only. We need to update JoinStep for push down into right.
- /// Only inner and left join are supported. Other types may generate default values for left table keys.
- /// So, if we push down a condition like `key != 0`, not all rows may be filtered.
- if (table_join.kind() == ASTTableJoin::Kind::Inner || table_join.kind() == ASTTableJoin::Kind::Left)
+ auto join_push_down = [&](ASTTableJoin::Kind kind) -> size_t
{
- const auto & left_header = join->getInputStreams().front().header;
+ const auto & table_join = join->getJoin()->getTableJoin();
+
+ /// Only inner and left(/right) join are supported. Other types may generate default values for left table keys.
+ /// So, if we push down a condition like `key != 0`, not all rows may be filtered.
+ if (table_join.kind() != ASTTableJoin::Kind::Inner && table_join.kind() != kind)
+ return 0;
+
+ bool is_left = kind == ASTTableJoin::Kind::Left;
+ const auto & input_header = is_left ? join->getInputStreams().front().header : join->getInputStreams().back().header;
const auto & res_header = join->getOutputStream().header;
Names allowed_keys;
- const auto & source_columns = left_header.getNames();
+ const auto & source_columns = input_header.getNames();
for (const auto & name : source_columns)
{
/// Skip key if it is renamed.
/// I don't know if it is possible. Just in case.
- if (!left_header.has(name) || !res_header.has(name))
+ if (!input_header.has(name) || !res_header.has(name))
continue;
/// Skip if type is changed. Push down expression expect equal types.
- if (!left_header.getByName(name).type->equals(*res_header.getByName(name).type))
+ if (!input_header.getByName(name).type->equals(*res_header.getByName(name).type))
continue;
allowed_keys.push_back(name);
@@ -234,7 +250,21 @@ size_t tryPushDownFilter(QueryPlan::Node * parent_node, QueryPlan::Nodes & nodes
const bool can_remove_filter
= std::find(source_columns.begin(), source_columns.end(), filter->getFilterColumnName()) == source_columns.end();
- if (auto updated_steps = tryAddNewFilterStep(parent_node, nodes, allowed_keys, can_remove_filter))
+ size_t updated_steps = tryAddNewFilterStep(parent_node, nodes, allowed_keys, can_remove_filter, is_left ? 0 : 1);
+ if (updated_steps > 0)
+ {
+ LOG_DEBUG(&Poco::Logger::get("tryPushDownFilter"), "Pushed down filter to {} side of join", kind);
+ }
+ return updated_steps;
+ };
+
+ if (size_t updated_steps = join_push_down(ASTTableJoin::Kind::Left))
+ return updated_steps;
+
+ /// For full sorting merge join we push down both to the left and right tables, because left and right streams are not independent.
+ if (join->allowPushDownToRight())
+ {
+ if (size_t updated_steps = join_push_down(ASTTableJoin::Kind::Right))
return updated_steps;
}
}
diff --git a/src/Processors/Transforms/MergeJoinTransform.cpp b/src/Processors/Transforms/MergeJoinTransform.cpp
index 690f751209f..c7b7afab541 100644
--- a/src/Processors/Transforms/MergeJoinTransform.cpp
+++ b/src/Processors/Transforms/MergeJoinTransform.cpp
@@ -855,7 +855,7 @@ MergeJoinTransform::MergeJoinTransform(
void MergeJoinTransform::onFinish()
{
- algorithm.logElapsed(total_stopwatch.elapsedSeconds(), true);
+ algorithm.logElapsed(total_stopwatch.elapsedSeconds());
}
}
diff --git a/src/Processors/Transforms/MergeJoinTransform.h b/src/Processors/Transforms/MergeJoinTransform.h
index 0098e470e6f..9f60eafb455 100644
--- a/src/Processors/Transforms/MergeJoinTransform.h
+++ b/src/Processors/Transforms/MergeJoinTransform.h
@@ -233,19 +233,14 @@ public:
virtual void consume(Input & input, size_t source_num) override;
virtual Status merge() override;
- void logElapsed(double seconds, bool force)
+ void logElapsed(double seconds)
{
- /// Do not log more frequently than once per ten seconds
- if (seconds - stat.last_log_seconds < 10 && !force)
- return;
-
LOG_TRACE(log,
"Finished pocessing in {} seconds"
", left: {} blocks, {} rows; right: {} blocks, {} rows"
", max blocks loaded to memory: {}",
seconds, stat.num_blocks[0], stat.num_rows[0], stat.num_blocks[1], stat.num_rows[1],
stat.max_blocks_loaded);
- stat.last_log_seconds = seconds;
}
private:
@@ -277,8 +272,6 @@ private:
size_t num_rows[2] = {0, 0};
size_t max_blocks_loaded = 0;
-
- double last_log_seconds = 0;
};
Statistic stat;
@@ -303,12 +296,6 @@ public:
protected:
void onFinish() override;
- void work() override
- {
- algorithm.logElapsed(total_stopwatch.elapsedSeconds(), true);
- Base::work();
- }
-
Poco::Logger * log;
};
diff --git a/src/Server/GRPCServer.cpp b/src/Server/GRPCServer.cpp
index e1c73b7ebbb..4178d0d62da 100644
--- a/src/Server/GRPCServer.cpp
+++ b/src/Server/GRPCServer.cpp
@@ -848,6 +848,7 @@ namespace
{
logs_queue = std::make_shared();
logs_queue->max_priority = Poco::Logger::parseLevel(client_logs_level.toString());
+ logs_queue->setSourceRegexp(settings.send_logs_source_regexp);
CurrentThread::attachInternalTextLogsQueue(logs_queue, client_logs_level);
CurrentThread::setFatalErrorCallback([this]{ onFatalError(); });
}
diff --git a/src/Server/TCPHandler.cpp b/src/Server/TCPHandler.cpp
index d1d762d3c61..05565063893 100644
--- a/src/Server/TCPHandler.cpp
+++ b/src/Server/TCPHandler.cpp
@@ -241,6 +241,7 @@ void TCPHandler::runImpl()
{
state.logs_queue = std::make_shared();
state.logs_queue->max_priority = Poco::Logger::parseLevel(client_logs_level.toString());
+ state.logs_queue->setSourceRegexp(query_context->getSettingsRef().send_logs_source_regexp);
CurrentThread::attachInternalTextLogsQueue(state.logs_queue, client_logs_level);
CurrentThread::setFatalErrorCallback([this]
{
diff --git a/src/Storages/MergeTree/MergeTreeData.cpp b/src/Storages/MergeTree/MergeTreeData.cpp
index 5900ea0fdb7..64aaa40bd4c 100644
--- a/src/Storages/MergeTree/MergeTreeData.cpp
+++ b/src/Storages/MergeTree/MergeTreeData.cpp
@@ -6229,8 +6229,13 @@ try
part_log_elem.event_type = type;
if (part_log_elem.event_type == PartLogElement::MERGE_PARTS)
+ {
if (merge_entry)
+ {
part_log_elem.merge_reason = PartLogElement::getMergeReasonType((*merge_entry)->merge_type);
+ part_log_elem.merge_algorithm = PartLogElement::getMergeAlgorithm((*merge_entry)->merge_algorithm);
+ }
+ }
part_log_elem.error = static_cast(execution_status.code);
part_log_elem.exception = execution_status.message;
diff --git a/src/Storages/StorageMongoDB.cpp b/src/Storages/StorageMongoDB.cpp
index 1f2523c8645..dce45b2431a 100644
--- a/src/Storages/StorageMongoDB.cpp
+++ b/src/Storages/StorageMongoDB.cpp
@@ -15,6 +15,7 @@
#include
#include
#include
+#include
namespace DB
{
@@ -86,6 +87,62 @@ void StorageMongoDB::connectIfNotConnected()
}
+class StorageMongoDBSink : public SinkToStorage
+{
+public:
+ explicit StorageMongoDBSink(
+ const std::string & collection_name_,
+ const std::string & db_name_,
+ const StorageMetadataPtr & metadata_snapshot_,
+ std::shared_ptr connection_)
+ : SinkToStorage(metadata_snapshot_->getSampleBlock())
+ , collection_name(collection_name_)
+ , db_name(db_name_)
+ , metadata_snapshot{metadata_snapshot_}
+ , connection(connection_)
+ {
+ }
+
+ String getName() const override { return "StorageMongoDBSink"; }
+
+ void consume(Chunk chunk) override
+ {
+ Poco::MongoDB::Database db(db_name);
+ Poco::MongoDB::Document::Ptr index = new Poco::MongoDB::Document();
+
+ auto block = getHeader().cloneWithColumns(chunk.detachColumns());
+
+ size_t num_rows = block.rows();
+ size_t num_cols = block.columns();
+
+ const auto columns = block.getColumns();
+ const auto data_types = block.getDataTypes();
+ const auto data_names = block.getNames();
+
+ std::vector row(num_cols);
+ for (const auto i : collections::range(0, num_rows))
+ {
+ for (const auto j : collections::range(0, num_cols))
+ {
+ WriteBufferFromOwnString ostr;
+ data_types[j]->getDefaultSerialization()->serializeText(*columns[j], i, ostr, FormatSettings{});
+ row[j] = ostr.str();
+ index->add(data_names[j], row[j]);
+ }
+ }
+ Poco::SharedPtr insert_request = db.createInsertRequest(collection_name);
+ insert_request->documents().push_back(index);
+ connection->sendRequest(*insert_request);
+ }
+
+private:
+ String collection_name;
+ String db_name;
+ StorageMetadataPtr metadata_snapshot;
+ std::shared_ptr connection;
+};
+
+
Pipe StorageMongoDB::read(
const Names & column_names,
const StorageSnapshotPtr & storage_snapshot,
@@ -109,6 +166,11 @@ Pipe StorageMongoDB::read(
return Pipe(std::make_shared(connection, createCursor(database_name, collection_name, sample_block), sample_block, max_block_size));
}
+SinkToStoragePtr StorageMongoDB::write(const ASTPtr & /* query */, const StorageMetadataPtr & metadata_snapshot, ContextPtr /* context */)
+{
+ connectIfNotConnected();
+ return std::make_shared(collection_name, database_name, metadata_snapshot, connection);
+}
StorageMongoDBConfiguration StorageMongoDB::getConfiguration(ASTs engine_args, ContextPtr context)
{
diff --git a/src/Storages/StorageMongoDB.h b/src/Storages/StorageMongoDB.h
index cb0654433bc..0e00b80432b 100644
--- a/src/Storages/StorageMongoDB.h
+++ b/src/Storages/StorageMongoDB.h
@@ -39,6 +39,11 @@ public:
size_t max_block_size,
unsigned num_streams) override;
+ SinkToStoragePtr write(
+ const ASTPtr & query,
+ const StorageMetadataPtr & /*metadata_snapshot*/,
+ ContextPtr context) override;
+
static StorageMongoDBConfiguration getConfiguration(ASTs engine_args, ContextPtr context);
private:
diff --git a/src/Storages/System/StorageSystemStackTrace.cpp b/src/Storages/System/StorageSystemStackTrace.cpp
index 5d37aa5b08a..cdd04964f55 100644
--- a/src/Storages/System/StorageSystemStackTrace.cpp
+++ b/src/Storages/System/StorageSystemStackTrace.cpp
@@ -5,10 +5,14 @@
#include
#include
+#include
#include
#include
+#include
+#include
+#include
#include
#include
#include
@@ -16,8 +20,11 @@
#include
#include
#include
-#include
+#include
#include
+#include
+#include
+#include
namespace DB
@@ -147,13 +154,84 @@ namespace
throw Exception("Logical error: read wrong number of bytes from pipe", ErrorCodes::LOGICAL_ERROR);
}
}
+
+ ColumnPtr getFilteredThreadIds(ASTPtr query, ContextPtr context)
+ {
+ MutableColumnPtr all_thread_ids = ColumnUInt64::create();
+
+ std::filesystem::directory_iterator end;
+
+ /// There is no better way to enumerate threads in a process other than looking into procfs.
+ for (std::filesystem::directory_iterator it("/proc/self/task"); it != end; ++it)
+ {
+ pid_t tid = parse(it->path().filename());
+ all_thread_ids->insert(tid);
+ }
+
+ Block block { ColumnWithTypeAndName(std::move(all_thread_ids), std::make_shared(), "thread_id") };
+ VirtualColumnUtils::filterBlockWithQuery(query, block, context);
+ return block.getByPosition(0).column;
+ }
+
+ using ThreadIdToName = std::unordered_map>;
+ ThreadIdToName getFilteredThreadNames(ASTPtr query, ContextPtr context, const PaddedPODArray & thread_ids)
+ {
+ ThreadIdToName tid_to_name;
+ MutableColumnPtr all_thread_names = ColumnString::create();
+
+ for (UInt64 tid : thread_ids)
+ {
+ std::filesystem::path thread_name_path = fmt::format("/proc/self/task/{}/comm", tid);
+ String thread_name;
+ if (std::filesystem::exists(thread_name_path))
+ {
+ constexpr size_t comm_buf_size = 32; /// More than enough for thread name
+ ReadBufferFromFile comm(thread_name_path.string(), comm_buf_size);
+ readEscapedStringUntilEOL(thread_name, comm);
+ comm.close();
+ }
+
+ tid_to_name[tid] = thread_name;
+ all_thread_names->insert(thread_name);
+ }
+
+ Block block { ColumnWithTypeAndName(std::move(all_thread_names), std::make_shared(), "thread_name") };
+ VirtualColumnUtils::filterBlockWithQuery(query, block, context);
+ ColumnPtr thread_names = std::move(block.getByPosition(0).column);
+
+ std::unordered_set filtered_thread_names;
+ for (size_t i = 0; i != thread_names->size(); ++i)
+ {
+ const auto & thread_name = thread_names->getDataAt(i);
+ filtered_thread_names.emplace(thread_name);
+ }
+
+ for (auto it = tid_to_name.begin(); it != tid_to_name.end();)
+ {
+ if (!filtered_thread_names.contains(it->second))
+ it = tid_to_name.erase(it);
+ else
+ ++it;
+ }
+
+ return tid_to_name;
+ }
}
StorageSystemStackTrace::StorageSystemStackTrace(const StorageID & table_id_)
- : IStorageSystemOneBlock(table_id_)
+ : IStorage(table_id_)
, log(&Poco::Logger::get("StorageSystemStackTrace"))
{
+ StorageInMemoryMetadata storage_metadata;
+ storage_metadata.setColumns(ColumnsDescription({
+ { "thread_name", std::make_shared() },
+ { "thread_id", std::make_shared() },
+ { "query_id", std::make_shared() },
+ { "trace", std::make_shared(std::make_shared()) },
+ }, { /* aliases */ }));
+ setInMemoryMetadata(storage_metadata);
+
notification_pipe.open();
/// Setup signal handler.
@@ -173,23 +251,40 @@ StorageSystemStackTrace::StorageSystemStackTrace(const StorageID & table_id_)
}
-NamesAndTypesList StorageSystemStackTrace::getNamesAndTypes()
+Pipe StorageSystemStackTrace::read(
+ const Names & column_names,
+ const StorageSnapshotPtr & storage_snapshot,
+ SelectQueryInfo & query_info,
+ ContextPtr context,
+ QueryProcessingStage::Enum /*processed_stage*/,
+ const size_t /*max_block_size*/,
+ const unsigned /*num_streams*/)
{
- return
- {
- { "thread_name", std::make_shared() },
- { "thread_id", std::make_shared() },
- { "query_id", std::make_shared() },
- { "trace", std::make_shared(std::make_shared()) }
- };
-}
+ storage_snapshot->check(column_names);
-
-void StorageSystemStackTrace::fillData(MutableColumns & res_columns, ContextPtr, const SelectQueryInfo &) const
-{
/// It shouldn't be possible to do concurrent reads from this table.
std::lock_guard lock(mutex);
+ /// Create a mask of what columns are needed in the result.
+
+ NameSet names_set(column_names.begin(), column_names.end());
+
+ Block sample_block = storage_snapshot->metadata->getSampleBlock();
+
+ std::vector columns_mask(sample_block.columns());
+ for (size_t i = 0, size = columns_mask.size(); i < size; ++i)
+ {
+ if (names_set.contains(sample_block.getByPosition(i).name))
+ {
+ columns_mask[i] = 1;
+ }
+ }
+
+ bool send_signal = names_set.contains("trace") || names_set.contains("query_id");
+ bool read_thread_names = names_set.contains("thread_name");
+
+ MutableColumns res_columns = sample_block.cloneEmptyColumns();
+
/// Send a signal to every thread and wait for result.
/// We must wait for every thread one by one sequentially,
/// because there is a limit on number of queued signals in OS and otherwise signals may get lost.
@@ -197,71 +292,85 @@ void StorageSystemStackTrace::fillData(MutableColumns & res_columns, ContextPtr,
/// Obviously, results for different threads may be out of sync.
- /// There is no better way to enumerate threads in a process other than looking into procfs.
+ ColumnPtr thread_ids = getFilteredThreadIds(query_info.query, context);
+ const auto & thread_ids_data = assert_cast(*thread_ids).getData();
- std::filesystem::directory_iterator end;
- for (std::filesystem::directory_iterator it("/proc/self/task"); it != end; ++it)
+ ThreadIdToName thread_names;
+ if (read_thread_names)
+ thread_names = getFilteredThreadNames(query_info.query, context, thread_ids_data);
+
+ for (UInt64 tid : thread_ids_data)
{
- pid_t tid = parse(it->path().filename());
-
- sigval sig_value{};
- sig_value.sival_int = sequence_num.load(std::memory_order_acquire);
- if (0 != ::sigqueue(tid, sig, sig_value))
- {
- /// The thread may has been already finished.
- if (ESRCH == errno)
- continue;
-
- throwFromErrno("Cannot send signal with sigqueue", ErrorCodes::CANNOT_SIGQUEUE);
- }
-
- std::filesystem::path thread_name_path = it->path();
- thread_name_path.append("comm");
+ size_t res_index = 0;
String thread_name;
- if (std::filesystem::exists(thread_name_path))
+ if (read_thread_names)
{
- constexpr size_t comm_buf_size = 32; /// More than enough for thread name
- ReadBufferFromFile comm(thread_name_path.string(), comm_buf_size);
- readEscapedStringUntilEOL(thread_name, comm);
- comm.close();
+ if (auto it = thread_names.find(tid); it != thread_names.end())
+ thread_name = it->second;
+ else
+ continue; /// was filtered out by "thread_name" condition
}
- /// Just in case we will wait for pipe with timeout. In case signal didn't get processed.
-
- if (wait(100) && sig_value.sival_int == data_ready_num.load(std::memory_order_acquire))
+ if (!send_signal)
{
- size_t stack_trace_size = stack_trace.getSize();
- size_t stack_trace_offset = stack_trace.getOffset();
-
- Array arr;
- arr.reserve(stack_trace_size - stack_trace_offset);
- for (size_t i = stack_trace_offset; i < stack_trace_size; ++i)
- arr.emplace_back(reinterpret_cast(stack_trace.getFramePointers()[i]));
-
- res_columns[0]->insert(thread_name);
- res_columns[1]->insert(tid);
- res_columns[2]->insertData(query_id_data, query_id_size);
- res_columns[3]->insert(arr);
+ res_columns[res_index++]->insert(thread_name);
+ res_columns[res_index++]->insert(tid);
+ res_columns[res_index++]->insertDefault();
+ res_columns[res_index++]->insertDefault();
}
else
{
- LOG_DEBUG(log, "Cannot obtain a stack trace for thread {}", tid);
+ sigval sig_value{};
- /// Cannot obtain a stack trace. But create a record in result nevertheless.
+ sig_value.sival_int = sequence_num.load(std::memory_order_acquire);
+ if (0 != ::sigqueue(tid, sig, sig_value))
+ {
+ /// The thread may has been already finished.
+ if (ESRCH == errno)
+ continue;
- res_columns[0]->insert(thread_name);
- res_columns[1]->insert(tid);
- res_columns[2]->insertDefault();
- res_columns[3]->insertDefault();
+ throwFromErrno("Cannot send signal with sigqueue", ErrorCodes::CANNOT_SIGQUEUE);
+ }
+
+ /// Just in case we will wait for pipe with timeout. In case signal didn't get processed.
+ if (send_signal && wait(100) && sig_value.sival_int == data_ready_num.load(std::memory_order_acquire))
+ {
+ size_t stack_trace_size = stack_trace.getSize();
+ size_t stack_trace_offset = stack_trace.getOffset();
+
+ Array arr;
+ arr.reserve(stack_trace_size - stack_trace_offset);
+ for (size_t i = stack_trace_offset; i < stack_trace_size; ++i)
+ arr.emplace_back(reinterpret_cast(stack_trace.getFramePointers()[i]));
+
+ res_columns[res_index++]->insert(thread_name);
+ res_columns[res_index++]->insert(tid);
+ res_columns[res_index++]->insertData(query_id_data, query_id_size);
+ res_columns[res_index++]->insert(arr);
+ }
+ else
+ {
+ LOG_DEBUG(log, "Cannot obtain a stack trace for thread {}", tid);
+
+ res_columns[res_index++]->insert(thread_name);
+ res_columns[res_index++]->insert(tid);
+ res_columns[res_index++]->insertDefault();
+ res_columns[res_index++]->insertDefault();
+ }
+
+ /// Signed integer overflow is undefined behavior in both C and C++. However, according to
+ /// C++ standard, Atomic signed integer arithmetic is defined to use two's complement; there
+ /// are no undefined results. See https://en.cppreference.com/w/cpp/atomic/atomic and
+ /// http://eel.is/c++draft/atomics.types.generic#atomics.types.int-8
+ ++sequence_num;
}
-
- /// Signed integer overflow is undefined behavior in both C and C++. However, according to
- /// C++ standard, Atomic signed integer arithmetic is defined to use two's complement; there
- /// are no undefined results. See https://en.cppreference.com/w/cpp/atomic/atomic and
- /// http://eel.is/c++draft/atomics.types.generic#atomics.types.int-8
- ++sequence_num;
}
+
+ UInt64 num_rows = res_columns.at(0)->size();
+ Chunk chunk(std::move(res_columns), num_rows);
+
+ return Pipe(std::make_shared(sample_block, std::move(chunk)));
}
}
diff --git a/src/Storages/System/StorageSystemStackTrace.h b/src/Storages/System/StorageSystemStackTrace.h
index c039ae53170..dd613882e49 100644
--- a/src/Storages/System/StorageSystemStackTrace.h
+++ b/src/Storages/System/StorageSystemStackTrace.h
@@ -3,7 +3,7 @@
#ifdef OS_LINUX /// Because of 'sigqueue' functions and RT signals.
#include
-#include
+#include
namespace Poco
{
@@ -19,20 +19,26 @@ class Context;
/// Allows to introspect stack trace of all server threads.
/// It acts like an embedded debugger.
/// More than one instance of this table cannot be used.
-class StorageSystemStackTrace final : public IStorageSystemOneBlock
+class StorageSystemStackTrace final : public IStorage
{
public:
explicit StorageSystemStackTrace(const StorageID & table_id_);
String getName() const override { return "SystemStackTrace"; }
- static NamesAndTypesList getNamesAndTypes();
+
+ Pipe read(
+ const Names & column_names,
+ const StorageSnapshotPtr & storage_snapshot,
+ SelectQueryInfo & query_info,
+ ContextPtr context,
+ QueryProcessingStage::Enum processed_stage,
+ size_t max_block_size,
+ unsigned num_streams) override;
+
+ bool isSystemStorage() const override { return true; }
protected:
- using IStorageSystemOneBlock::IStorageSystemOneBlock;
- void fillData(MutableColumns & res_columns, ContextPtr context, const SelectQueryInfo & query_info) const override;
-
mutable std::mutex mutex;
-
Poco::Logger * log;
};
diff --git a/src/TableFunctions/TableFunctionMongoDB.cpp b/src/TableFunctions/TableFunctionMongoDB.cpp
new file mode 100644
index 00000000000..5e96b85e64c
--- /dev/null
+++ b/src/TableFunctions/TableFunctionMongoDB.cpp
@@ -0,0 +1,104 @@
+#include
+
+#include
+#include
+
+#include
+#include
+#include
+
+#include
+#include
+#include
+#include
+#include
+#include
+
+
+namespace DB
+{
+
+namespace ErrorCodes
+{
+ extern const int BAD_ARGUMENTS;
+ extern const int NUMBER_OF_ARGUMENTS_DOESNT_MATCH;
+}
+
+
+StoragePtr TableFunctionMongoDB::executeImpl(const ASTPtr & /*ast_function*/,
+ ContextPtr context, const String & table_name, ColumnsDescription /*cached_columns*/) const
+{
+ auto columns = getActualTableStructure(context);
+ auto storage = std::make_shared(
+ StorageID(configuration->database, table_name),
+ configuration->host,
+ configuration->port,
+ configuration->database,
+ configuration->table,
+ configuration->username,
+ configuration->password,
+ configuration->options,
+ columns,
+ ConstraintsDescription(),
+ String{});
+ storage->startup();
+ return storage;
+}
+
+ColumnsDescription TableFunctionMongoDB::getActualTableStructure(ContextPtr context) const
+{
+ return parseColumnsListFromString(structure, context);
+}
+
+void TableFunctionMongoDB::parseArguments(const ASTPtr & ast_function, ContextPtr context)
+{
+ const auto & func_args = ast_function->as();
+ if (!func_args.arguments)
+ throw Exception("Table function 'mongodb' must have arguments.", ErrorCodes::BAD_ARGUMENTS);
+
+ ASTs & args = func_args.arguments->children;
+
+ if (args.size() < 6 || args.size() > 7)
+ {
+ throw Exception(ErrorCodes::NUMBER_OF_ARGUMENTS_DOESNT_MATCH,
+ "Table function 'mongodb' requires from 6 to 7 parameters: mongodb('host:port', database, collection, 'user', 'password', structure, [, 'options'])");
+ }
+
+ ASTs main_arguments(args.begin(), args.begin() + 5);
+
+ for (size_t i = 5; i < args.size(); ++i)
+ {
+ if (const auto * ast_func = typeid_cast(args[i].get()))
+ {
+ const auto * args_expr = assert_cast(ast_func->arguments.get());
+ auto function_args = args_expr->children;
+ if (function_args.size() != 2)
+ throw Exception(ErrorCodes::BAD_ARGUMENTS, "Expected key-value defined argument");
+
+ auto arg_name = function_args[0]->as()->name();
+
+ if (arg_name == "structure")
+ structure = checkAndGetLiteralArgument(function_args[1], "structure");
+ else if (arg_name == "options")
+ main_arguments.push_back(function_args[1]);
+ }
+ else if (i == 5)
+ {
+ structure = checkAndGetLiteralArgument(args[i], "structure");
+ }
+ else if (i == 6)
+ {
+ main_arguments.push_back(args[i]);
+ }
+ }
+
+ configuration = StorageMongoDB::getConfiguration(main_arguments, context);
+}
+
+
+void registerTableFunctionMongoDB(TableFunctionFactory & factory)
+{
+ factory.registerFunction();
+}
+
+}
diff --git a/src/TableFunctions/TableFunctionMongoDB.h b/src/TableFunctions/TableFunctionMongoDB.h
new file mode 100644
index 00000000000..40e4802e9e6
--- /dev/null
+++ b/src/TableFunctions/TableFunctionMongoDB.h
@@ -0,0 +1,31 @@
+#pragma once
+
+#include
+#include
+#include
+
+namespace DB
+{
+
+class TableFunctionMongoDB : public ITableFunction
+{
+public:
+ static constexpr auto name = "mongodb";
+
+ std::string getName() const override { return name; }
+
+private:
+ StoragePtr executeImpl(
+ const ASTPtr & ast_function, ContextPtr context,
+ const std::string & table_name, ColumnsDescription cached_columns) const override;
+
+ const char * getStorageTypeName() const override { return "MongoDB"; }
+
+ ColumnsDescription getActualTableStructure(ContextPtr context) const override;
+ void parseArguments(const ASTPtr & ast_function, ContextPtr context) override;
+
+ std::optional configuration;
+ String structure;
+};
+
+}
diff --git a/src/TableFunctions/registerTableFunctions.cpp b/src/TableFunctions/registerTableFunctions.cpp
index e9d2fa18639..12ca4abe113 100644
--- a/src/TableFunctions/registerTableFunctions.cpp
+++ b/src/TableFunctions/registerTableFunctions.cpp
@@ -19,6 +19,7 @@ void registerTableFunctions()
registerTableFunctionValues(factory);
registerTableFunctionInput(factory);
registerTableFunctionGenerate(factory);
+ registerTableFunctionMongoDB(factory);
registerTableFunctionMeiliSearch(factory);
diff --git a/src/TableFunctions/registerTableFunctions.h b/src/TableFunctions/registerTableFunctions.h
index 906cf7a74b9..49a1ef60a6b 100644
--- a/src/TableFunctions/registerTableFunctions.h
+++ b/src/TableFunctions/registerTableFunctions.h
@@ -17,6 +17,7 @@ void registerTableFunctionURL(TableFunctionFactory & factory);
void registerTableFunctionValues(TableFunctionFactory & factory);
void registerTableFunctionInput(TableFunctionFactory & factory);
void registerTableFunctionGenerate(TableFunctionFactory & factory);
+void registerTableFunctionMongoDB(TableFunctionFactory & factory);
void registerTableFunctionMeiliSearch(TableFunctionFactory & factory);
diff --git a/tests/integration/test_merge_tree_s3/configs/config.d/storage_conf.xml b/tests/integration/test_merge_tree_s3/configs/config.d/storage_conf.xml
index a6e2d29c5d5..e414ae5a259 100644
--- a/tests/integration/test_merge_tree_s3/configs/config.d/storage_conf.xml
+++ b/tests/integration/test_merge_tree_s3/configs/config.d/storage_conf.xml
@@ -34,6 +34,16 @@
33554432
1
+
+ s3
+ http://minio1:9001/root/data/
+ minio
+ minio123
+ 33554432
+ 1
+ /jbod1/
+ 1000000000
+
@@ -67,6 +77,13 @@
+
+
+
+ s3_with_cache_and_jbod
+
+
+
diff --git a/tests/integration/test_merge_tree_s3/test.py b/tests/integration/test_merge_tree_s3/test.py
index 129adce47ca..3ce2a08ae74 100644
--- a/tests/integration/test_merge_tree_s3/test.py
+++ b/tests/integration/test_merge_tree_s3/test.py
@@ -26,6 +26,18 @@ def cluster():
],
with_minio=True,
)
+
+ cluster.add_instance(
+ "node_with_limited_disk",
+ main_configs=[
+ "configs/config.d/storage_conf.xml",
+ "configs/config.d/bg_processing_pool_conf.xml",
+ ],
+ with_minio=True,
+ tmpfs=[
+ "/jbod1:size=2M",
+ ],
+ )
logging.info("Starting cluster...")
cluster.start()
logging.info("Cluster started")
@@ -678,3 +690,22 @@ def test_lazy_seek_optimization_for_async_read(cluster, node_name):
minio = cluster.minio_client
for obj in list(minio.list_objects(cluster.minio_bucket, "data/")):
minio.remove_object(cluster.minio_bucket, obj.object_name)
+
+
+@pytest.mark.parametrize("node_name", ["node_with_limited_disk"])
+def test_cache_with_full_disk_space(cluster, node_name):
+ node = cluster.instances[node_name]
+ node.query("DROP TABLE IF EXISTS s3_test NO DELAY")
+ node.query(
+ "CREATE TABLE s3_test (key UInt32, value String) Engine=MergeTree() ORDER BY key SETTINGS storage_policy='s3_with_cache_and_jbod';"
+ )
+ node.query(
+ "INSERT INTO s3_test SELECT * FROM generateRandom('key UInt32, value String') LIMIT 500000"
+ )
+ node.query(
+ "SELECT * FROM s3_test WHERE value LIKE '%abc%' ORDER BY value FORMAT Null"
+ )
+ assert node.contains_in_log(
+ "Insert into cache is skipped due to insufficient disk space"
+ )
+ node.query("DROP TABLE IF EXISTS s3_test NO DELAY")
diff --git a/tests/integration/test_storage_mongodb/test.py b/tests/integration/test_storage_mongodb/test.py
index d8ca207d0a6..74b2b15fda0 100644
--- a/tests/integration/test_storage_mongodb/test.py
+++ b/tests/integration/test_storage_mongodb/test.py
@@ -253,3 +253,30 @@ def test_missing_columns(started_cluster):
result = node.query("SELECT count() FROM simple_mongo_table WHERE isNull(data)")
assert result == "10\n"
simple_mongo_table.drop()
+
+
+@pytest.mark.parametrize("started_cluster", [False], indirect=["started_cluster"])
+def test_simple_insert_select(started_cluster):
+ mongo_connection = get_mongo_connection(started_cluster)
+ db = mongo_connection["test"]
+ db.add_user("root", "clickhouse")
+ simple_mongo_table = db["simple_table"]
+
+ node = started_cluster.instances["node"]
+ node.query("DROP TABLE IF EXISTS simple_mongo_table")
+ node.query(
+ "CREATE TABLE simple_mongo_table(key UInt64, data String) ENGINE = MongoDB('mongo1:27017', 'test', 'simple_table', 'root', 'clickhouse')"
+ )
+ node.query("INSERT INTO simple_mongo_table SELECT 1, 'kek'")
+
+ assert (
+ node.query("SELECT data from simple_mongo_table where key = 1").strip() == "kek"
+ )
+ node.query("INSERT INTO simple_mongo_table(key) SELECT 12")
+ assert int(node.query("SELECT count() from simple_mongo_table")) == 2
+ assert (
+ node.query("SELECT data from simple_mongo_table where key = 12").strip() == ""
+ )
+
+ node.query("DROP TABLE simple_mongo_table")
+ simple_mongo_table.drop()
diff --git a/tests/integration/test_table_function_mongodb/__init__.py b/tests/integration/test_table_function_mongodb/__init__.py
new file mode 100644
index 00000000000..e69de29bb2d
diff --git a/tests/integration/test_table_function_mongodb/configs_secure/config.d/ssl_conf.xml b/tests/integration/test_table_function_mongodb/configs_secure/config.d/ssl_conf.xml
new file mode 100644
index 00000000000..3efe98e7045
--- /dev/null
+++ b/tests/integration/test_table_function_mongodb/configs_secure/config.d/ssl_conf.xml
@@ -0,0 +1,8 @@
+
+
+
+
+ none
+
+
+
diff --git a/tests/integration/test_table_function_mongodb/test.py b/tests/integration/test_table_function_mongodb/test.py
new file mode 100644
index 00000000000..e0ad71b0079
--- /dev/null
+++ b/tests/integration/test_table_function_mongodb/test.py
@@ -0,0 +1,276 @@
+import pymongo
+
+import pytest
+from helpers.client import QueryRuntimeException
+
+from helpers.cluster import ClickHouseCluster
+
+
+@pytest.fixture(scope="module")
+def started_cluster(request):
+ try:
+ cluster = ClickHouseCluster(__file__)
+ node = cluster.add_instance(
+ "node",
+ with_mongo=True,
+ main_configs=[
+ "configs_secure/config.d/ssl_conf.xml",
+ ],
+ with_mongo_secure=request.param,
+ )
+ cluster.start()
+ yield cluster
+ finally:
+ cluster.shutdown()
+
+
+def get_mongo_connection(started_cluster, secure=False, with_credentials=True):
+ connection_str = ""
+ if with_credentials:
+ connection_str = "mongodb://root:clickhouse@localhost:{}".format(
+ started_cluster.mongo_port
+ )
+ else:
+ connection_str = "mongodb://localhost:{}".format(
+ started_cluster.mongo_no_cred_port
+ )
+ if secure:
+ connection_str += "/?tls=true&tlsAllowInvalidCertificates=true"
+ return pymongo.MongoClient(connection_str)
+
+
+@pytest.mark.parametrize("started_cluster", [False], indirect=["started_cluster"])
+def test_simple_select(started_cluster):
+ mongo_connection = get_mongo_connection(started_cluster)
+ db = mongo_connection["test"]
+ db.add_user("root", "clickhouse")
+ simple_mongo_table = db["simple_table"]
+
+ node = started_cluster.instances["node"]
+ for i in range(0, 100):
+ node.query(
+ "INSERT INTO FUNCTION mongodb('mongo1:27017', 'test', 'simple_table', 'root', 'clickhouse', structure='key UInt64, data String') (key, data) VALUES ({}, '{}')".format(
+ i, hex(i * i)
+ )
+ )
+ assert (
+ node.query(
+ "SELECT COUNT() FROM mongodb('mongo1:27017', 'test', 'simple_table', 'root', 'clickhouse', structure='key UInt64, data String')"
+ )
+ == "100\n"
+ )
+ assert (
+ node.query(
+ "SELECT sum(key) FROM mongodb('mongo1:27017', 'test', 'simple_table', 'root', 'clickhouse', structure='key UInt64, data String')"
+ )
+ == str(sum(range(0, 100))) + "\n"
+ )
+ assert (
+ node.query(
+ "SELECT sum(key) FROM mongodb('mongo1:27017', 'test', 'simple_table', 'root', 'clickhouse', 'key UInt64, data String')"
+ )
+ == str(sum(range(0, 100))) + "\n"
+ )
+
+ assert (
+ node.query(
+ "SELECT data from mongodb('mongo1:27017', 'test', 'simple_table', 'root', 'clickhouse', structure='key UInt64, data String') where key = 42"
+ )
+ == hex(42 * 42) + "\n"
+ )
+ simple_mongo_table.drop()
+
+
+@pytest.mark.parametrize("started_cluster", [False], indirect=["started_cluster"])
+def test_complex_data_type(started_cluster):
+ mongo_connection = get_mongo_connection(started_cluster)
+ db = mongo_connection["test"]
+ db.add_user("root", "clickhouse")
+ incomplete_mongo_table = db["complex_table"]
+ data = []
+ for i in range(0, 100):
+ data.append({"key": i, "data": hex(i * i), "dict": {"a": i, "b": str(i)}})
+ incomplete_mongo_table.insert_many(data)
+
+ node = started_cluster.instances["node"]
+
+ assert (
+ node.query(
+ "SELECT COUNT() FROM mongodb('mongo1:27017', 'test', 'complex_table', 'root', 'clickhouse', structure='key UInt64, data String, dict Map(UInt64, String)')"
+ )
+ == "100\n"
+ )
+ assert (
+ node.query(
+ "SELECT sum(key) FROM mongodb('mongo1:27017', 'test', 'complex_table', 'root', 'clickhouse', structure='key UInt64, data String, dict Map(UInt64, String)')"
+ )
+ == str(sum(range(0, 100))) + "\n"
+ )
+
+ assert (
+ node.query(
+ "SELECT data from mongodb('mongo1:27017', 'test', 'complex_table', 'root', 'clickhouse', structure='key UInt64, data String, dict Map(UInt64, String)') where key = 42"
+ )
+ == hex(42 * 42) + "\n"
+ )
+ incomplete_mongo_table.drop()
+
+
+@pytest.mark.parametrize("started_cluster", [False], indirect=["started_cluster"])
+def test_incorrect_data_type(started_cluster):
+ mongo_connection = get_mongo_connection(started_cluster)
+ db = mongo_connection["test"]
+ db.add_user("root", "clickhouse")
+ strange_mongo_table = db["strange_table"]
+ data = []
+ for i in range(0, 100):
+ data.append({"key": i, "data": hex(i * i), "aaaa": "Hello"})
+ strange_mongo_table.insert_many(data)
+
+ node = started_cluster.instances["node"]
+
+ with pytest.raises(QueryRuntimeException):
+ node.query(
+ "SELECT aaaa FROM mongodb('mongo1:27017', 'test', 'strange_table', 'root', 'clickhouse', structure='key UInt64, data String')"
+ )
+
+ strange_mongo_table.drop()
+
+
+@pytest.mark.parametrize("started_cluster", [True], indirect=["started_cluster"])
+def test_secure_connection(started_cluster):
+ mongo_connection = get_mongo_connection(started_cluster, secure=True)
+ db = mongo_connection["test"]
+ db.add_user("root", "clickhouse")
+ simple_mongo_table = db["simple_table"]
+ data = []
+ for i in range(0, 100):
+ data.append({"key": i, "data": hex(i * i)})
+ simple_mongo_table.insert_many(data)
+
+ node = started_cluster.instances["node"]
+
+ assert (
+ node.query(
+ "SELECT COUNT() FROM mongodb('mongo1:27017', 'test', 'simple_table', 'root', 'clickhouse', structure='key UInt64, data String', options='ssl=true')"
+ )
+ == "100\n"
+ )
+ assert (
+ node.query(
+ "SELECT sum(key) FROM mongodb('mongo1:27017', 'test', 'simple_table', 'root', 'clickhouse', structure='key UInt64, data String', options='ssl=true')"
+ )
+ == str(sum(range(0, 100))) + "\n"
+ )
+ assert (
+ node.query(
+ "SELECT sum(key) FROM mongodb('mongo1:27017', 'test', 'simple_table', 'root', 'clickhouse', 'key UInt64, data String', 'ssl=true')"
+ )
+ == str(sum(range(0, 100))) + "\n"
+ )
+
+ assert (
+ node.query(
+ "SELECT data from mongodb('mongo1:27017', 'test', 'simple_table', 'root', 'clickhouse', structure='key UInt64, data String', options='ssl=true') where key = 42"
+ )
+ == hex(42 * 42) + "\n"
+ )
+ simple_mongo_table.drop()
+
+
+@pytest.mark.parametrize("started_cluster", [False], indirect=["started_cluster"])
+def test_predefined_connection_configuration(started_cluster):
+ mongo_connection = get_mongo_connection(started_cluster)
+ db = mongo_connection["test"]
+ db.add_user("root", "clickhouse")
+ simple_mongo_table = db["simple_table"]
+ data = []
+ for i in range(0, 100):
+ data.append({"key": i, "data": hex(i * i)})
+ simple_mongo_table.insert_many(data)
+
+ node = started_cluster.instances["node"]
+ assert (
+ node.query(
+ "SELECT count() FROM mongodb('mongo1:27017', 'test', 'simple_table', 'root', 'clickhouse', structure='key UInt64, data String')"
+ )
+ == "100\n"
+ )
+ simple_mongo_table.drop()
+
+
+@pytest.mark.parametrize("started_cluster", [False], indirect=["started_cluster"])
+def test_no_credentials(started_cluster):
+ mongo_connection = get_mongo_connection(started_cluster, with_credentials=False)
+ db = mongo_connection["test"]
+ simple_mongo_table = db["simple_table"]
+ data = []
+ for i in range(0, 100):
+ data.append({"key": i, "data": hex(i * i)})
+ simple_mongo_table.insert_many(data)
+
+ node = started_cluster.instances["node"]
+ assert (
+ node.query(
+ "SELECT count() FROM mongodb('mongo2:27017', 'test', 'simple_table', '', '', structure='key UInt64, data String')"
+ )
+ == "100\n"
+ )
+ simple_mongo_table.drop()
+
+
+@pytest.mark.parametrize("started_cluster", [False], indirect=["started_cluster"])
+def test_auth_source(started_cluster):
+ mongo_connection = get_mongo_connection(started_cluster, with_credentials=False)
+ admin_db = mongo_connection["admin"]
+ admin_db.add_user(
+ "root",
+ "clickhouse",
+ roles=[{"role": "userAdminAnyDatabase", "db": "admin"}, "readWriteAnyDatabase"],
+ )
+ simple_mongo_table = admin_db["simple_table"]
+ data = []
+ for i in range(0, 50):
+ data.append({"key": i, "data": hex(i * i)})
+ simple_mongo_table.insert_many(data)
+ db = mongo_connection["test"]
+ simple_mongo_table = db["simple_table"]
+ data = []
+ for i in range(0, 100):
+ data.append({"key": i, "data": hex(i * i)})
+ simple_mongo_table.insert_many(data)
+
+ node = started_cluster.instances["node"]
+
+ node.query_and_get_error(
+ "SELECT count() FROM mongodb('mongo2:27017', 'test', 'simple_table', 'root', 'clickhouse', structure='key UInt64, data String')"
+ )
+
+ assert (
+ node.query(
+ "SELECT count() FROM mongodb('mongo2:27017', 'test', 'simple_table', 'root', 'clickhouse', structure='key UInt64, data String', options='authSource=admin')"
+ )
+ == "100\n"
+ )
+ simple_mongo_table.drop()
+
+
+@pytest.mark.parametrize("started_cluster", [False], indirect=["started_cluster"])
+def test_missing_columns(started_cluster):
+ mongo_connection = get_mongo_connection(started_cluster)
+ db = mongo_connection["test"]
+ db.add_user("root", "clickhouse")
+ simple_mongo_table = db["simple_table"]
+ data = []
+ for i in range(0, 10):
+ data.append({"key": i, "data": hex(i * i)})
+ for i in range(0, 10):
+ data.append({"key": i})
+ simple_mongo_table.insert_many(data)
+
+ node = started_cluster.instances["node"]
+ result = node.query(
+ "SELECT count() FROM mongodb('mongo1:27017', 'test', 'simple_table', 'root', 'clickhouse', structure='key UInt64, data Nullable(String)') WHERE isNull(data)"
+ )
+ assert result == "10\n"
+ simple_mongo_table.drop()
diff --git a/tests/queries/0_stateless/01051_system_stack_trace.reference b/tests/queries/0_stateless/01051_system_stack_trace.reference
index d00491fd7e5..b82bda76142 100644
--- a/tests/queries/0_stateless/01051_system_stack_trace.reference
+++ b/tests/queries/0_stateless/01051_system_stack_trace.reference
@@ -1 +1,18 @@
+-- { echo }
+SELECT count() > 0 FROM system.stack_trace WHERE query_id != '';
+1
+-- opimization for not reading /proc/self/task/{}/comm and avoid sending signal
+SELECT countIf(thread_id > 0) > 0 FROM system.stack_trace;
+1
+-- optimization for trace
+SELECT length(trace) > 0 FROM system.stack_trace LIMIT 1;
+1
+-- optimization for query_id
+SELECT length(query_id) > 0 FROM system.stack_trace WHERE query_id != '' LIMIT 1;
+1
+-- optimization for thread_name
+SELECT length(thread_name) > 0 FROM system.stack_trace WHERE thread_name != '' LIMIT 1;
+1
+-- enough rows (optimizations works "correctly")
+SELECT count() > 100 FROM system.stack_trace;
1
diff --git a/tests/queries/0_stateless/01051_system_stack_trace.sql b/tests/queries/0_stateless/01051_system_stack_trace.sql
index e495e2198ea..d018d01fa22 100644
--- a/tests/queries/0_stateless/01051_system_stack_trace.sql
+++ b/tests/queries/0_stateless/01051_system_stack_trace.sql
@@ -1,4 +1,14 @@
-- Tags: race
--- at least this query should be present
+-- { echo }
SELECT count() > 0 FROM system.stack_trace WHERE query_id != '';
+-- opimization for not reading /proc/self/task/{}/comm and avoid sending signal
+SELECT countIf(thread_id > 0) > 0 FROM system.stack_trace;
+-- optimization for trace
+SELECT length(trace) > 0 FROM system.stack_trace LIMIT 1;
+-- optimization for query_id
+SELECT length(query_id) > 0 FROM system.stack_trace WHERE query_id != '' LIMIT 1;
+-- optimization for thread_name
+SELECT length(thread_name) > 0 FROM system.stack_trace WHERE thread_name != '' LIMIT 1;
+-- enough rows (optimizations works "correctly")
+SELECT count() > 100 FROM system.stack_trace;
diff --git a/tests/queries/0_stateless/02297_regex_parsing_file_names.reference b/tests/queries/0_stateless/02297_regex_parsing_file_names.reference
new file mode 100644
index 00000000000..b4de3947675
--- /dev/null
+++ b/tests/queries/0_stateless/02297_regex_parsing_file_names.reference
@@ -0,0 +1 @@
+11
diff --git a/tests/queries/0_stateless/02297_regex_parsing_file_names.sh b/tests/queries/0_stateless/02297_regex_parsing_file_names.sh
new file mode 100755
index 00000000000..12ccb54235b
--- /dev/null
+++ b/tests/queries/0_stateless/02297_regex_parsing_file_names.sh
@@ -0,0 +1,40 @@
+#!/usr/bin/env bash
+# Tags: no-parallel
+
+CURDIR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)
+# shellcheck source=../shell_config.sh
+. "$CURDIR"/../shell_config.sh
+
+# Data preparation.
+
+# Now we can get the user_files_path by use the table file function for trick. also we can get it by query as:
+# "insert into function file('exist.txt', 'CSV', 'val1 char') values ('aaaa'); select _path from file('exist.txt', 'CSV', 'val1 char')"
+CLICKHOUSE_USER_FILES_PATH=$(clickhouse-client --query "select _path, _file from file('nonexist.txt', 'CSV', 'val1 char')" 2>&1 | grep Exception | awk '{gsub("/nonexist.txt","",$9); print $9}')
+
+mkdir -p ${CLICKHOUSE_USER_FILES_PATH}/
+
+rm -rf ${CLICKHOUSE_USER_FILES_PATH}/file_{0..10}.csv
+
+echo '0' > ${CLICKHOUSE_USER_FILES_PATH}/file_0.csv
+echo '0' > ${CLICKHOUSE_USER_FILES_PATH}/file_1.csv
+echo '0' > ${CLICKHOUSE_USER_FILES_PATH}/file_2.csv
+echo '0' > ${CLICKHOUSE_USER_FILES_PATH}/file_3.csv
+echo '0' > ${CLICKHOUSE_USER_FILES_PATH}/file_4.csv
+echo '0' > ${CLICKHOUSE_USER_FILES_PATH}/file_5.csv
+echo '0' > ${CLICKHOUSE_USER_FILES_PATH}/file_6.csv
+echo '0' > ${CLICKHOUSE_USER_FILES_PATH}/file_7.csv
+echo '0' > ${CLICKHOUSE_USER_FILES_PATH}/file_8.csv
+echo '0' > ${CLICKHOUSE_USER_FILES_PATH}/file_9.csv
+echo '0' > ${CLICKHOUSE_USER_FILES_PATH}/file_10.csv
+
+# echo '' > ${CLICKHOUSE_USER_FILES_PATH}/file_10.csv
+
+${CLICKHOUSE_CLIENT} -q "DROP TABLE IF EXISTS t_regex;"
+
+${CLICKHOUSE_CLIENT} -q "CREATE TABLE t_regex (id UInt64) ENGINE = MergeTree() order by id;"
+
+${CLICKHOUSE_CLIENT} -q "INSERT INTO t_regex SELECT * FROM file('file_{0..10}.csv','CSV');"
+${CLICKHOUSE_CLIENT} -q "SELECT count() from t_regex;"
+
+rm -rf ${CLICKHOUSE_USER_FILES_PATH}/file_{0..10}.csv;
+${CLICKHOUSE_CLIENT} -q "DROP TABLE IF EXISTS t_regex;"
diff --git a/tests/queries/0_stateless/02353_translate.reference b/tests/queries/0_stateless/02353_translate.reference
new file mode 100644
index 00000000000..557b5182127
--- /dev/null
+++ b/tests/queries/0_stateless/02353_translate.reference
@@ -0,0 +1,16 @@
+Hello, world!
+cagaacgttc
+jihgfe
+jihgff
+jihgfg
+jihgfh
+jihgfi
+HotelGenev
+ードとは
+¿йðՅন𐐏
+¿йðՅনন
+¿йðՅনՅ
+¿йðՅনð
+¿йðՅনй
+abc
+abc
diff --git a/tests/queries/0_stateless/02353_translate.sql b/tests/queries/0_stateless/02353_translate.sql
new file mode 100644
index 00000000000..a7059ec85a7
--- /dev/null
+++ b/tests/queries/0_stateless/02353_translate.sql
@@ -0,0 +1,13 @@
+SELECT translate('Hello? world.', '.?', '!,');
+SELECT translate('gtcttgcaag', 'ACGTacgt', 'TGCAtgca');
+SELECT translate(toString(number), '0123456789', 'abcdefghij') FROM numbers(987654, 5);
+
+SELECT translateUTF8('HôtelGenèv', 'Ááéíóúôè', 'aaeiouoe');
+SELECT translateUTF8('中文内码', '久标准中文内码', 'ユニコードとは');
+SELECT translateUTF8(toString(number), '1234567890', 'ዩय𐑿𐐏নՅðй¿ค') FROM numbers(987654, 5);
+
+SELECT translate('abc', '', '');
+SELECT translateUTF8('abc', '', '');
+
+SELECT translate('abc', 'Ááéíóúôè', 'aaeiouoe'); -- { serverError 36 }
+SELECT translateUTF8('abc', 'efg', ''); -- { serverError 36 }
diff --git a/tests/queries/0_stateless/02357_query_cancellation_race.reference b/tests/queries/0_stateless/02357_query_cancellation_race.reference
new file mode 100644
index 00000000000..e69de29bb2d
diff --git a/tests/queries/0_stateless/02357_query_cancellation_race.sh b/tests/queries/0_stateless/02357_query_cancellation_race.sh
new file mode 100755
index 00000000000..6b20e050ce3
--- /dev/null
+++ b/tests/queries/0_stateless/02357_query_cancellation_race.sh
@@ -0,0 +1,9 @@
+#!/usr/bin/env bash
+# Tags: race
+
+CURDIR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)
+# shellcheck source=../shell_config.sh
+. "$CURDIR"/../shell_config.sh
+
+$CLICKHOUSE_CLIENT -q "create table tab (x UInt64, y String) engine = MergeTree order by x"
+for _ in $(seq 1 100); do timeout -s 2 0.05 $CLICKHOUSE_CLIENT --interactive_delay 1000 -q "insert into tab select number, toString(number) from system.numbers" || true; done
diff --git a/tests/queries/0_stateless/02359_send_logs_source_regexp.reference b/tests/queries/0_stateless/02359_send_logs_source_regexp.reference
new file mode 100644
index 00000000000..d00491fd7e5
--- /dev/null
+++ b/tests/queries/0_stateless/02359_send_logs_source_regexp.reference
@@ -0,0 +1 @@
+1
diff --git a/tests/queries/0_stateless/02359_send_logs_source_regexp.sh b/tests/queries/0_stateless/02359_send_logs_source_regexp.sh
new file mode 100755
index 00000000000..d3b60bc59f4
--- /dev/null
+++ b/tests/queries/0_stateless/02359_send_logs_source_regexp.sh
@@ -0,0 +1,11 @@
+#!/usr/bin/env bash
+
+CURDIR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)
+# shellcheck source=../shell_config.sh
+. "$CURDIR"/../shell_config.sh
+
+[ ! -z "$CLICKHOUSE_CLIENT_REDEFINED" ] && CLICKHOUSE_CLIENT=$CLICKHOUSE_CLIENT_REDEFINED
+
+CLICKHOUSE_CLIENT=$(echo ${CLICKHOUSE_CLIENT} | sed 's/'"--send_logs_level=${CLICKHOUSE_CLIENT_SERVER_LOGS_LEVEL}"'/--send_logs_level=trace/g')
+regexp="executeQuery|InterpreterSelectQuery"
+$CLICKHOUSE_CLIENT --send_logs_source_regexp "$regexp" -q "SELECT 1;" 2> >(grep -v -E "$regexp" 1>&2)
diff --git a/tests/queries/0_stateless/02362_part_log_merge_algorithm.reference b/tests/queries/0_stateless/02362_part_log_merge_algorithm.reference
new file mode 100644
index 00000000000..91a959d4255
--- /dev/null
+++ b/tests/queries/0_stateless/02362_part_log_merge_algorithm.reference
@@ -0,0 +1,5 @@
+data_horizontal all_1_1_0 NewPart Undecided
+data_horizontal all_1_1_1 MergeParts Horizontal
+data_vertical all_1_1_0 NewPart Undecided
+data_vertical all_2_2_0 NewPart Undecided
+data_vertical all_1_2_1 MergeParts Vertical
diff --git a/tests/queries/0_stateless/02362_part_log_merge_algorithm.sql b/tests/queries/0_stateless/02362_part_log_merge_algorithm.sql
new file mode 100644
index 00000000000..6446b46c393
--- /dev/null
+++ b/tests/queries/0_stateless/02362_part_log_merge_algorithm.sql
@@ -0,0 +1,26 @@
+CREATE TABLE data_horizontal (
+ key Int
+)
+Engine=MergeTree()
+ORDER BY key;
+
+INSERT INTO data_horizontal VALUES (1);
+OPTIMIZE TABLE data_horizontal FINAL;
+SYSTEM FLUSH LOGS;
+SELECT table, part_name, event_type, merge_algorithm FROM system.part_log WHERE event_date >= yesterday() AND database = currentDatabase() AND table = 'data_horizontal' ORDER BY event_time_microseconds;
+
+CREATE TABLE data_vertical
+(
+ key UInt64,
+ value String
+)
+ENGINE = MergeTree()
+ORDER BY key
+SETTINGS index_granularity_bytes = 0, enable_mixed_granularity_parts = 0, min_bytes_for_wide_part = 0,
+vertical_merge_algorithm_min_rows_to_activate = 1, vertical_merge_algorithm_min_columns_to_activate = 1;
+
+INSERT INTO data_vertical VALUES (1, '1');
+INSERT INTO data_vertical VALUES (2, '2');
+OPTIMIZE TABLE data_vertical FINAL;
+SYSTEM FLUSH LOGS;
+SELECT table, part_name, event_type, merge_algorithm FROM system.part_log WHERE event_date >= yesterday() AND database = currentDatabase() AND table = 'data_vertical' ORDER BY event_time_microseconds;
diff --git a/tests/queries/0_stateless/02363_mapupdate_improve.reference b/tests/queries/0_stateless/02363_mapupdate_improve.reference
new file mode 100644
index 00000000000..04e2b943929
--- /dev/null
+++ b/tests/queries/0_stateless/02363_mapupdate_improve.reference
@@ -0,0 +1,10 @@
+{'fruit':'apple','season':'autumn'}
+{'fruit':'apple','season':'autumn'}
+{'fruit':'apple','season':'autumn'}
+{'fruit':'apple','season':'autumn'}
+{'fruit':'apple','season':'autumn'}
+{'season':'autumn','fruit':'apple'}
+{'season':'autumn','fruit':'apple'}
+{'season':'autumn','fruit':'apple'}
+{'season':'autumn','fruit':'apple'}
+{'season':'autumn','fruit':'apple'}
diff --git a/tests/queries/0_stateless/02363_mapupdate_improve.sql b/tests/queries/0_stateless/02363_mapupdate_improve.sql
new file mode 100644
index 00000000000..6b7723cc9b4
--- /dev/null
+++ b/tests/queries/0_stateless/02363_mapupdate_improve.sql
@@ -0,0 +1,11 @@
+-- Tags: no-backward-compatibility-check
+DROP TABLE IF EXISTS map_test;
+CREATE TABLE map_test(`tags` Map(String, String)) ENGINE = MergeTree PRIMARY KEY tags ORDER BY tags SETTINGS index_granularity = 8192;
+INSERT INTO map_test (tags) VALUES (map('fruit','apple','color','red'));
+INSERT INTO map_test (tags) VALUES (map('fruit','apple','color','red'));
+INSERT INTO map_test (tags) VALUES (map('fruit','apple','color','red'));
+INSERT INTO map_test (tags) VALUES (map('fruit','apple','color','red'));
+INSERT INTO map_test (tags) VALUES (map('fruit','apple','color','red'));
+SELECT mapUpdate(mapFilter((k, v) -> (k in ('fruit')), tags), map('season', 'autumn')) FROM map_test;
+SELECT mapUpdate(map('season','autumn'), mapFilter((k, v) -> (k in ('fruit')), tags)) FROM map_test;
+DROP TABLE map_test;