Merge branch 'master' into better-priority-queue

This commit is contained in:
Alexey Milovidov 2019-12-22 14:13:21 +03:00
commit 906bfdbaa7
36 changed files with 453 additions and 167 deletions

4
.gitmodules vendored
View File

@ -67,10 +67,10 @@
url = https://github.com/ClickHouse-Extras/libgsasl.git
[submodule "contrib/libcxx"]
path = contrib/libcxx
url = https://github.com/llvm-mirror/libcxx.git
url = https://github.com/ClickHouse-Extras/libcxx.git
[submodule "contrib/libcxxabi"]
path = contrib/libcxxabi
url = https://github.com/llvm-mirror/libcxxabi.git
url = https://github.com/ClickHouse-Extras/libcxxabi.git
[submodule "contrib/snappy"]
path = contrib/snappy
url = https://github.com/google/snappy

View File

@ -1,7 +1,5 @@
if (COMPILER_CLANG)
option (USE_LIBCXX "Use libc++ and libc++abi instead of libstdc++" ON)
option (USE_INTERNAL_LIBCXX_LIBRARY "Set to FALSE to use system libcxx and libcxxabi libraries instead of bundled" ${NOT_UNBUNDLED})
endif()
option (USE_LIBCXX "Use libc++ and libc++abi instead of libstdc++" ${NOT_UNBUNDLED})
option (USE_INTERNAL_LIBCXX_LIBRARY "Set to FALSE to use system libcxx and libcxxabi libraries instead of bundled" ${NOT_UNBUNDLED})
if (USE_LIBCXX)
set (CMAKE_CXX_FLAGS_DEBUG "${CMAKE_CXX_FLAGS_DEBUG} -D_LIBCPP_DEBUG=0") # More checks in debug build.

2
contrib/libcxx vendored

@ -1 +1 @@
Subproject commit 9807685d51db467e097ad5eb8d5c2c16922794b2
Subproject commit f7c63235238a71b7e0563fab8c7c5ec1b54831f6

View File

@ -3,41 +3,43 @@ include(CheckCXXCompilerFlag)
set(LIBCXX_SOURCE_DIR ${ClickHouse_SOURCE_DIR}/contrib/libcxx)
set(SRCS
${LIBCXX_SOURCE_DIR}/src/optional.cpp
${LIBCXX_SOURCE_DIR}/src/variant.cpp
${LIBCXX_SOURCE_DIR}/src/chrono.cpp
${LIBCXX_SOURCE_DIR}/src/thread.cpp
${LIBCXX_SOURCE_DIR}/src/experimental/memory_resource.cpp
${LIBCXX_SOURCE_DIR}/src/iostream.cpp
${LIBCXX_SOURCE_DIR}/src/strstream.cpp
${LIBCXX_SOURCE_DIR}/src/ios.cpp
${LIBCXX_SOURCE_DIR}/src/future.cpp
${LIBCXX_SOURCE_DIR}/src/shared_mutex.cpp
${LIBCXX_SOURCE_DIR}/src/condition_variable.cpp
${LIBCXX_SOURCE_DIR}/src/hash.cpp
${LIBCXX_SOURCE_DIR}/src/string.cpp
${LIBCXX_SOURCE_DIR}/src/debug.cpp
${LIBCXX_SOURCE_DIR}/src/stdexcept.cpp
${LIBCXX_SOURCE_DIR}/src/utility.cpp
${LIBCXX_SOURCE_DIR}/src/any.cpp
${LIBCXX_SOURCE_DIR}/src/exception.cpp
${LIBCXX_SOURCE_DIR}/src/memory.cpp
${LIBCXX_SOURCE_DIR}/src/new.cpp
${LIBCXX_SOURCE_DIR}/src/valarray.cpp
${LIBCXX_SOURCE_DIR}/src/vector.cpp
${LIBCXX_SOURCE_DIR}/src/algorithm.cpp
${LIBCXX_SOURCE_DIR}/src/functional.cpp
${LIBCXX_SOURCE_DIR}/src/regex.cpp
${LIBCXX_SOURCE_DIR}/src/any.cpp
${LIBCXX_SOURCE_DIR}/src/bind.cpp
${LIBCXX_SOURCE_DIR}/src/mutex.cpp
${LIBCXX_SOURCE_DIR}/src/charconv.cpp
${LIBCXX_SOURCE_DIR}/src/typeinfo.cpp
${LIBCXX_SOURCE_DIR}/src/locale.cpp
${LIBCXX_SOURCE_DIR}/src/filesystem/operations.cpp
${LIBCXX_SOURCE_DIR}/src/filesystem/int128_builtins.cpp
${LIBCXX_SOURCE_DIR}/src/chrono.cpp
${LIBCXX_SOURCE_DIR}/src/condition_variable.cpp
${LIBCXX_SOURCE_DIR}/src/condition_variable_destructor.cpp
${LIBCXX_SOURCE_DIR}/src/debug.cpp
${LIBCXX_SOURCE_DIR}/src/exception.cpp
${LIBCXX_SOURCE_DIR}/src/experimental/memory_resource.cpp
${LIBCXX_SOURCE_DIR}/src/filesystem/directory_iterator.cpp
${LIBCXX_SOURCE_DIR}/src/system_error.cpp
${LIBCXX_SOURCE_DIR}/src/filesystem/int128_builtins.cpp
${LIBCXX_SOURCE_DIR}/src/filesystem/operations.cpp
${LIBCXX_SOURCE_DIR}/src/functional.cpp
${LIBCXX_SOURCE_DIR}/src/future.cpp
${LIBCXX_SOURCE_DIR}/src/hash.cpp
${LIBCXX_SOURCE_DIR}/src/ios.cpp
${LIBCXX_SOURCE_DIR}/src/iostream.cpp
${LIBCXX_SOURCE_DIR}/src/locale.cpp
${LIBCXX_SOURCE_DIR}/src/memory.cpp
${LIBCXX_SOURCE_DIR}/src/mutex.cpp
${LIBCXX_SOURCE_DIR}/src/mutex_destructor.cpp
${LIBCXX_SOURCE_DIR}/src/new.cpp
${LIBCXX_SOURCE_DIR}/src/optional.cpp
${LIBCXX_SOURCE_DIR}/src/random.cpp
${LIBCXX_SOURCE_DIR}/src/regex.cpp
${LIBCXX_SOURCE_DIR}/src/shared_mutex.cpp
${LIBCXX_SOURCE_DIR}/src/stdexcept.cpp
${LIBCXX_SOURCE_DIR}/src/string.cpp
${LIBCXX_SOURCE_DIR}/src/strstream.cpp
${LIBCXX_SOURCE_DIR}/src/system_error.cpp
${LIBCXX_SOURCE_DIR}/src/thread.cpp
${LIBCXX_SOURCE_DIR}/src/typeinfo.cpp
${LIBCXX_SOURCE_DIR}/src/utility.cpp
${LIBCXX_SOURCE_DIR}/src/valarray.cpp
${LIBCXX_SOURCE_DIR}/src/variant.cpp
${LIBCXX_SOURCE_DIR}/src/vector.cpp
)
add_library(cxx ${SRCS})

2
contrib/libcxxabi vendored

@ -1 +1 @@
Subproject commit d56efcc7a52739518dbe7df9e743073e00951fa1
Subproject commit c26cf36f8387c5edf2cabb4a630f0975c35aa9fb

View File

@ -77,9 +77,7 @@ if (CMAKE_CXX_COMPILER_ID STREQUAL "Clang")
elseif (CMAKE_CXX_COMPILER_ID STREQUAL "GNU")
# Add compiler options only to c++ compiler
function(add_cxx_compile_options option)
add_compile_options(
"$<$<STREQUAL:$<TARGET_PROPERTY:LINKER_LANGUAGE>,CXX>:${option}>"
)
add_compile_options("$<$<STREQUAL:$<TARGET_PROPERTY:LINKER_LANGUAGE>,CXX>:${option}>")
endfunction()
# Warn about boolean expression compared with an integer value different from true/false
add_cxx_compile_options(-Wbool-compare)
@ -113,8 +111,8 @@ elseif (CMAKE_CXX_COMPILER_ID STREQUAL "GNU")
add_cxx_compile_options(-Wnon-virtual-dtor)
# Obvious
add_cxx_compile_options(-Wno-return-local-addr)
# Obvious
add_cxx_compile_options(-Wnull-dereference)
# This warning is disabled due to false positives if compiled with libc++: https://gcc.gnu.org/bugzilla/show_bug.cgi?id=90037
#add_cxx_compile_options(-Wnull-dereference)
# Obvious
add_cxx_compile_options(-Wodr)
# Obvious

View File

@ -2,6 +2,7 @@
#include <Core/Types.h>
#include <Common/CpuId.h>
#include <Common/quoteString.h>
#include <common/getMemoryAmount.h>
#include <DataStreams/copyData.h>
#include <DataStreams/NullBlockOutputStream.h>
@ -142,7 +143,7 @@ bool PerformanceTest::checkPreconditions() const
if (!exist)
{
LOG_WARNING(log, "Table " << table_to_check << " doesn't exist");
LOG_WARNING(log, "Table " << backQuote(table_to_check) << " doesn't exist");
return false;
}
}

View File

@ -1,5 +1,7 @@
#include "PrometheusMetricsWriter.h"
#include <algorithm>
#include <IO/WriteHelpers.h>
namespace
@ -20,6 +22,11 @@ void writeOutLine(DB::WriteBuffer & wb, T && val, TArgs &&... args)
writeOutLine(wb, std::forward<TArgs>(args)...);
}
void replaceInvalidChars(std::string & metric_name)
{
std::replace(metric_name.begin(), metric_name.end(), '.', '_');
}
}
@ -47,6 +54,7 @@ void PrometheusMetricsWriter::write(WriteBuffer & wb) const
std::string metric_name{ProfileEvents::getName(static_cast<ProfileEvents::Event>(i))};
std::string metric_doc{ProfileEvents::getDocumentation(static_cast<ProfileEvents::Event>(i))};
replaceInvalidChars(metric_name);
std::string key{profile_events_prefix + metric_name};
writeOutLine(wb, "# HELP", key, metric_doc);
@ -64,6 +72,7 @@ void PrometheusMetricsWriter::write(WriteBuffer & wb) const
std::string metric_name{CurrentMetrics::getName(static_cast<CurrentMetrics::Metric>(i))};
std::string metric_doc{CurrentMetrics::getDocumentation(static_cast<CurrentMetrics::Metric>(i))};
replaceInvalidChars(metric_name);
std::string key{current_metrics_prefix + metric_name};
writeOutLine(wb, "# HELP", key, metric_doc);
@ -78,6 +87,8 @@ void PrometheusMetricsWriter::write(WriteBuffer & wb) const
for (const auto & name_value : async_metrics_values)
{
std::string key{asynchronous_metrics_prefix + name_value.first};
replaceInvalidChars(key);
auto value = name_value.second;
// TODO: add HELP section? asynchronous_metrics contains only key and value

View File

@ -28,9 +28,9 @@ private:
const bool send_metrics;
const bool send_asynchronous_metrics;
static inline constexpr auto profile_events_prefix = "ClickHouseProfileEvents";
static inline constexpr auto current_metrics_prefix = "ClickHouseMetrics";
static inline constexpr auto asynchronous_metrics_prefix = "ClickHouseAsyncMetrics";
static inline constexpr auto profile_events_prefix = "ClickHouseProfileEvents_";
static inline constexpr auto current_metrics_prefix = "ClickHouseMetrics_";
static inline constexpr auto asynchronous_metrics_prefix = "ClickHouseAsyncMetrics_";
};
}

View File

@ -110,7 +110,7 @@ void TCPHandler::runImpl()
{
if (!connection_context.isDatabaseExist(default_database))
{
Exception e("Database " + default_database + " doesn't exist", ErrorCodes::UNKNOWN_DATABASE);
Exception e("Database " + backQuote(default_database) + " doesn't exist", ErrorCodes::UNKNOWN_DATABASE);
LOG_ERROR(log, "Code: " << e.code() << ", e.displayText() = " << e.displayText()
<< ", Stack trace:\n\n" << e.getStackTrace().toString());
sendException(e, connection_context.getSettingsRef().calculate_text_stack_trace);

View File

@ -318,7 +318,7 @@
<!--
<prometheus>
<endpoint>/metrics</endpoint>
<port>8001</port>
<port>9363</port>
<metrics>true</metrics>
<events>true</events>

View File

@ -454,7 +454,7 @@ Dwarf::AttributeValue Dwarf::readAttributeValue(std::string_view & sp, uint64_t
case DW_FORM_flag:
return uint64_t(read<uint8_t>(sp));
case DW_FORM_flag_present:
return 1;
return uint64_t(1);
case DW_FORM_sec_offset: [[fallthrough]];
case DW_FORM_ref_addr:
return readOffset(sp, is64Bit);

View File

@ -153,7 +153,7 @@ StoragePtr DatabaseWithOwnTablesBase::detachTable(const String & table_name)
auto it = tables.find(table_name);
if (it == tables.end())
throw Exception("Table " + name + "." + table_name + " doesn't exist.", ErrorCodes::UNKNOWN_TABLE);
throw Exception("Table " + backQuote(name) + "." + backQuote(table_name) + " doesn't exist.", ErrorCodes::UNKNOWN_TABLE);
res = it->second;
tables.erase(it);
}

View File

@ -142,7 +142,7 @@ LibraryDictionarySource::LibraryDictionarySource(
if (!Poco::File(path).exists())
throw Exception(
"LibraryDictionarySource: Can't load lib " + toString() + ": " + Poco::File(path).path() + " - File doesn't exist",
"LibraryDictionarySource: Can't load library " + Poco::File(path).path() + ": file doesn't exist",
ErrorCodes::FILE_DOESNT_EXIST);
description.init(sample_block);

View File

@ -991,24 +991,37 @@ private:
void executeBitmapData(Block & block, const ColumnNumbers & arguments, size_t result, size_t input_rows_count)
{
const ColumnAggregateFunction * columns[2];
bool is_column_const[2];
for (size_t i = 0; i < 2; ++i)
{
if (auto argument_column_const = typeid_cast<const ColumnConst *>(block.getByPosition(arguments[i]).column.get()))
{
columns[i] = typeid_cast<const ColumnAggregateFunction *>(argument_column_const->getDataColumnPtr().get());
is_column_const[i] = true;
}
else
{
columns[i] = typeid_cast<const ColumnAggregateFunction *>(block.getByPosition(arguments[i]).column.get());
is_column_const[i] = false;
}
}
auto col_to = ColumnAggregateFunction::create(columns[0]->getAggregateFunction());
col_to->reserve(input_rows_count);
const PaddedPODArray<AggregateDataPtr> & container0 = columns[0]->getData();
const PaddedPODArray<AggregateDataPtr> & container1 = columns[1]->getData();
for (size_t i = 0; i < input_rows_count; ++i)
{
col_to->insertFrom(columns[0]->getData()[i]);
const AggregateDataPtr data_ptr_0 = is_column_const[0] ? container0[0] : container0[i];
const AggregateDataPtr data_ptr_1 = is_column_const[1] ? container1[0] : container1[i];
col_to->insertFrom(data_ptr_0);
AggregateFunctionGroupBitmapData<T> & bitmap_data_1 = *reinterpret_cast<AggregateFunctionGroupBitmapData<T> *>(col_to->getData()[i]);
const AggregateFunctionGroupBitmapData<T> & bitmap_data_2
= *reinterpret_cast<const AggregateFunctionGroupBitmapData<T> *>(columns[1]->getData()[i]);
= *reinterpret_cast<const AggregateFunctionGroupBitmapData<T> *>(data_ptr_1);
Impl<T>::apply(bitmap_data_1, bitmap_data_2);
}
block.getByPosition(result).column = std::move(col_to);

View File

@ -72,9 +72,6 @@ void AsynchronousMetrics::run()
while (true)
{
if (wait_cond.wait_until(lock, get_next_minute(), [this] { return quit; }))
break;
try
{
update();
@ -83,6 +80,9 @@ void AsynchronousMetrics::run()
{
tryLogCurrentException(__PRETTY_FUNCTION__);
}
if (wait_cond.wait_until(lock, get_next_minute(), [this] { return quit; }))
break;
}
}

View File

@ -230,6 +230,16 @@ void ExpressionAnalyzer::initGlobalSubqueriesAndExternalTables(bool do_global)
}
NamesAndTypesList ExpressionAnalyzer::sourceWithJoinedColumns() const
{
auto result_columns = sourceColumns();
result_columns.insert(result_columns.end(), array_join_columns.begin(), array_join_columns.end());
result_columns.insert(result_columns.end(),
analyzedJoin().columnsAddedByJoin().begin(), analyzedJoin().columnsAddedByJoin().end());
return result_columns;
}
void SelectQueryExpressionAnalyzer::tryMakeSetForIndexFromSubquery(const ASTPtr & subquery_or_table_name)
{
auto set_key = PreparedSetKey::forSubquery(*subquery_or_table_name);
@ -313,12 +323,7 @@ void SelectQueryExpressionAnalyzer::makeSetsForIndex(const ASTPtr & node)
}
else
{
NamesAndTypesList temp_columns = sourceColumns();
temp_columns.insert(temp_columns.end(), array_join_columns.begin(), array_join_columns.end());
temp_columns.insert(temp_columns.end(),
analyzedJoin().columnsAddedByJoin().begin(), analyzedJoin().columnsAddedByJoin().end());
ExpressionActionsPtr temp_actions = std::make_shared<ExpressionActions>(temp_columns, context);
ExpressionActionsPtr temp_actions = std::make_shared<ExpressionActions>(sourceWithJoinedColumns(), context);
getRootActions(left_in_operand, true, temp_actions);
Block sample_block_with_calculated_columns = temp_actions->getSampleBlock();
@ -718,7 +723,7 @@ void SelectQueryExpressionAnalyzer::appendSelect(ExpressionActionsChain & chain,
step.required_output.push_back(child->getColumnName());
}
bool SelectQueryExpressionAnalyzer::appendOrderBy(ExpressionActionsChain & chain, bool only_types)
bool SelectQueryExpressionAnalyzer::appendOrderBy(ExpressionActionsChain & chain, bool only_types, bool optimize_read_in_order)
{
const auto * select_query = getSelectQuery();
@ -739,6 +744,16 @@ bool SelectQueryExpressionAnalyzer::appendOrderBy(ExpressionActionsChain & chain
step.required_output.push_back(order_expression->getColumnName());
}
if (optimize_read_in_order)
{
auto all_columns = sourceWithJoinedColumns();
for (auto & child : select_query->orderBy()->children)
{
order_by_elements_actions.emplace_back(std::make_shared<ExpressionActions>(all_columns, context));
getRootActions(child, only_types, order_by_elements_actions.back());
}
}
return true;
}

View File

@ -18,6 +18,7 @@ class Context;
struct ExpressionActionsChain;
class ExpressionActions;
using ExpressionActionsPtr = std::shared_ptr<ExpressionActions>;
using ManyExpressionActions = std::vector<ExpressionActionsPtr>;
struct ASTTableJoin;
class IJoin;
@ -46,6 +47,9 @@ struct ExpressionAnalyzerData
/// All new temporary tables obtained by performing the GLOBAL IN/JOIN subqueries.
Tables external_tables;
/// Actions by every element of ORDER BY
ManyExpressionActions order_by_elements_actions;
};
@ -119,6 +123,7 @@ protected:
const AnalyzedJoin & analyzedJoin() const { return *syntax->analyzed_join; }
const NamesAndTypesList & sourceColumns() const { return syntax->required_source_columns; }
const std::vector<const ASTFunction *> & aggregates() const { return syntax->aggregates; }
NamesAndTypesList sourceWithJoinedColumns() const;
/// Find global subqueries in the GLOBAL IN/JOIN sections. Fills in external_tables.
void initGlobalSubqueriesAndExternalTables(bool do_global);
@ -169,6 +174,8 @@ public:
const PreparedSets & getPreparedSets() const { return prepared_sets; }
const ManyExpressionActions & getOrderByActions() const { return order_by_elements_actions; }
/// Tables that will need to be sent to remote servers for distributed query processing.
const Tables & getExternalTables() const { return external_tables; }
@ -201,7 +208,7 @@ public:
/// After aggregation:
bool appendHaving(ExpressionActionsChain & chain, bool only_types);
void appendSelect(ExpressionActionsChain & chain, bool only_types);
bool appendOrderBy(ExpressionActionsChain & chain, bool only_types);
bool appendOrderBy(ExpressionActionsChain & chain, bool only_types, bool optimize_read_in_order);
bool appendLimitBy(ExpressionActionsChain & chain, bool only_types);
/// Deletes all columns except mentioned by SELECT, arranges the remaining columns and renames them to aliases.
void appendProjectResult(ExpressionActionsChain & chain) const;

View File

@ -33,6 +33,7 @@ public:
virtual bool alwaysReturnsEmptySet() const { return false; }
virtual BlockInputStreamPtr createStreamWithNonJoinedRows(const Block &, UInt64) const { return {}; }
virtual bool hasStreamWithNonJoinedRows() const { return false; }
};
using JoinPtr = std::shared_ptr<IJoin>;

View File

@ -770,10 +770,18 @@ InterpreterSelectQuery::analyzeExpressions(
}
}
bool has_stream_with_non_joned_rows = (res.before_join && res.before_join->getTableJoinAlgo()->hasStreamWithNonJoinedRows());
res.optimize_read_in_order =
context.getSettingsRef().optimize_read_in_order
&& storage && query.orderBy()
&& !query_analyzer.hasAggregation()
&& !query.final()
&& !has_stream_with_non_joned_rows;
/// If there is aggregation, we execute expressions in SELECT and ORDER BY on the initiating server, otherwise on the source servers.
query_analyzer.appendSelect(chain, only_types || (res.need_aggregate ? !res.second_stage : !res.first_stage));
res.selected_columns = chain.getLastStep().required_output;
res.has_order_by = query_analyzer.appendOrderBy(chain, only_types || (res.need_aggregate ? !res.second_stage : !res.first_stage));
res.has_order_by = query_analyzer.appendOrderBy(chain, only_types || (res.need_aggregate ? !res.second_stage : !res.first_stage), res.optimize_read_in_order);
res.before_order_and_select = chain.getLastActions();
chain.addStep();
@ -943,87 +951,6 @@ static UInt64 getLimitForSorting(const ASTSelectQuery & query, const Context & c
}
static InputSortingInfoPtr optimizeReadInOrder(const MergeTreeData & merge_tree, const ASTSelectQuery & query,
const Context & context, const SyntaxAnalyzerResultPtr & global_syntax_result)
{
if (!merge_tree.hasSortingKey())
return {};
auto order_descr = getSortDescription(query, context);
SortDescription order_key_prefix_descr;
int read_direction = order_descr.at(0).direction;
const auto & sorting_key_columns = merge_tree.getSortingKeyColumns();
size_t prefix_size = std::min(order_descr.size(), sorting_key_columns.size());
for (size_t i = 0; i < prefix_size; ++i)
{
if (global_syntax_result->array_join_result_to_source.count(order_descr[i].column_name))
break;
/// Optimize in case of exact match with order key element
/// or in some simple cases when order key element is wrapped into monotonic function.
int current_direction = order_descr[i].direction;
if (order_descr[i].column_name == sorting_key_columns[i] && current_direction == read_direction)
order_key_prefix_descr.push_back(order_descr[i]);
else
{
auto ast = query.orderBy()->children[i]->children.at(0);
auto syntax_result = SyntaxAnalyzer(context).analyze(ast, global_syntax_result->required_source_columns);
auto actions = ExpressionAnalyzer(ast, syntax_result, context).getActions(true);
const auto & input_columns = actions->getRequiredColumnsWithTypes();
if (input_columns.size() != 1 || input_columns.front().name != sorting_key_columns[i])
break;
bool first = true;
for (const auto & action : actions->getActions())
{
if (action.type != ExpressionAction::APPLY_FUNCTION)
continue;
if (!first)
{
current_direction = 0;
break;
}
else
first = false;
const auto & func = *action.function_base;
if (!func.hasInformationAboutMonotonicity())
{
current_direction = 0;
break;
}
auto monotonicity = func.getMonotonicityForRange(*input_columns.front().type, {}, {});
if (!monotonicity.is_monotonic)
{
current_direction = 0;
break;
}
else if (!monotonicity.is_positive)
current_direction *= -1;
}
if (!current_direction || (i > 0 && current_direction != read_direction))
break;
if (i == 0)
read_direction = current_direction;
order_key_prefix_descr.push_back(order_descr[i]);
}
}
if (order_key_prefix_descr.empty())
return {};
return std::make_shared<InputSortingInfo>(std::move(order_key_prefix_descr), read_direction);
}
template <typename TPipeline>
void InterpreterSelectQuery::executeImpl(TPipeline & pipeline, const BlockInputStreamPtr & prepared_input, QueryPipeline & save_context_and_storage)
{
@ -1044,13 +971,6 @@ void InterpreterSelectQuery::executeImpl(TPipeline & pipeline, const BlockInputS
const Settings & settings = context->getSettingsRef();
auto & expressions = analysis_result;
InputSortingInfoPtr input_sorting_info;
if (settings.optimize_read_in_order && storage && query.orderBy() && !query_analyzer->hasAggregation() && !query.final() && !query.join())
{
if (const auto * merge_tree_data = dynamic_cast<const MergeTreeData *>(storage.get()))
input_sorting_info = optimizeReadInOrder(*merge_tree_data, query, *context, syntax_analyzer_result);
}
if (options.only_analyze)
{
if constexpr (pipeline_with_processors)
@ -1108,7 +1028,7 @@ void InterpreterSelectQuery::executeImpl(TPipeline & pipeline, const BlockInputS
throw Exception("PREWHERE is not supported if the table is filtered by row-level security expression", ErrorCodes::ILLEGAL_PREWHERE);
/** Read the data from Storage. from_stage - to what stage the request was completed in Storage. */
executeFetchColumns(from_stage, pipeline, input_sorting_info, expressions.prewhere_info, expressions.columns_to_remove_after_prewhere, save_context_and_storage);
executeFetchColumns(from_stage, pipeline, expressions.prewhere_info, expressions.columns_to_remove_after_prewhere, save_context_and_storage);
LOG_TRACE(log, QueryProcessingStage::toString(from_stage) << " -> " << QueryProcessingStage::toString(options.to_stage));
}
@ -1367,7 +1287,7 @@ void InterpreterSelectQuery::executeImpl(TPipeline & pipeline, const BlockInputS
template <typename TPipeline>
void InterpreterSelectQuery::executeFetchColumns(
QueryProcessingStage::Enum processing_stage, TPipeline & pipeline,
const InputSortingInfoPtr & input_sorting_info, const PrewhereInfoPtr & prewhere_info, const Names & columns_to_remove_after_prewhere,
const PrewhereInfoPtr & prewhere_info, const Names & columns_to_remove_after_prewhere,
QueryPipeline & save_context_and_storage)
{
constexpr bool pipeline_with_processors = std::is_same<TPipeline, QueryPipeline>::value;
@ -1691,7 +1611,19 @@ void InterpreterSelectQuery::executeFetchColumns(
query_info.syntax_analyzer_result = syntax_analyzer_result;
query_info.sets = query_analyzer->getPreparedSets();
query_info.prewhere_info = prewhere_info;
query_info.input_sorting_info = input_sorting_info;
/// Create optimizer with prepared actions.
/// Maybe we will need to calc input_sorting_info later, e.g. while reading from StorageMerge.
if (analysis_result.optimize_read_in_order)
{
query_info.order_by_optimizer = std::make_shared<ReadInOrderOptimizer>(
query_analyzer->getOrderByActions(),
getSortDescription(query, *context),
query_info.syntax_analyzer_result);
query_info.input_sorting_info = query_info.order_by_optimizer->getInputOrder(storage);
}
BlockInputStreams streams;
Pipes pipes;

View File

@ -12,6 +12,7 @@
#include <Interpreters/SelectQueryOptions.h>
#include <Storages/SelectQueryInfo.h>
#include <Storages/TableStructureLockHolder.h>
#include <Storages/ReadInOrderOptimizer.h>
#include <Processors/QueryPipeline.h>
#include <Columns/FilterDescription.h>
@ -152,6 +153,7 @@ private:
bool has_limit_by = false;
bool remove_where_filter = false;
bool optimize_read_in_order = false;
ExpressionActionsPtr before_join; /// including JOIN
ExpressionActionsPtr before_where;
@ -201,7 +203,7 @@ private:
template <typename TPipeline>
void executeFetchColumns(QueryProcessingStage::Enum processing_stage, TPipeline & pipeline,
const InputSortingInfoPtr & sorting_info, const PrewhereInfoPtr & prewhere_info,
const PrewhereInfoPtr & prewhere_info,
const Names & columns_to_remove_after_prewhere,
QueryPipeline & save_context_and_storage);

View File

@ -1418,4 +1418,14 @@ BlockInputStreamPtr Join::createStreamWithNonJoinedRows(const Block & result_sam
return {};
}
bool Join::hasStreamWithNonJoinedRows() const
{
if (table_join->strictness() == ASTTableJoin::Strictness::Asof ||
table_join->strictness() == ASTTableJoin::Strictness::Semi)
return false;
return isRightOrFull(table_join->kind());
}
}

View File

@ -179,6 +179,7 @@ public:
* left_sample_block is passed without account of 'use_nulls' setting (columns will be converted to Nullable inside).
*/
BlockInputStreamPtr createStreamWithNonJoinedRows(const Block & result_sample_block, UInt64 max_block_size) const override;
bool hasStreamWithNonJoinedRows() const override;
/// Number of keys in all built JOIN maps.
size_t getTotalRowCount() const final;

View File

@ -30,6 +30,7 @@
#include <Common/setThreadName.h>
#include <Common/typeid_cast.h>
#include <common/logger_useful.h>
#include <Common/quoteString.h>
namespace DB
@ -364,7 +365,7 @@ bool StorageKafka::streamToViews()
{
auto table = global_context.getTable(database_name, table_name);
if (!table)
throw Exception("Engine table " + database_name + "." + table_name + " doesn't exist.", ErrorCodes::LOGICAL_ERROR);
throw Exception("Engine table " + backQuote(database_name) + "." + backQuote(table_name) + " doesn't exist.", ErrorCodes::LOGICAL_ERROR);
// Create an INSERT query for streaming data
auto insert = std::make_shared<ASTInsertQuery>();

View File

@ -12,6 +12,7 @@
#include <Storages/MergeTree/MergeTreeIndices.h>
#include <Storages/MergeTree/MergeTreeIndexReader.h>
#include <Storages/MergeTree/KeyCondition.h>
#include <Storages/ReadInOrderOptimizer.h>
#include <Parsers/ASTIdentifier.h>
#include <Parsers/ASTLiteral.h>
#include <Parsers/ASTFunction.h>

View File

@ -0,0 +1,113 @@
#include <Storages/ReadInOrderOptimizer.h>
#include <Storages/MergeTree/MergeTreeData.h>
#include <Interpreters/AnalyzedJoin.h>
#include <Functions/IFunction.h>
namespace DB
{
namespace ErrorCodes
{
extern const int LOGICAL_ERROR;
}
ReadInOrderOptimizer::ReadInOrderOptimizer(
const ManyExpressionActions & elements_actions_,
const SortDescription & required_sort_description_,
const SyntaxAnalyzerResultPtr & syntax_result)
: elements_actions(elements_actions_)
, required_sort_description(required_sort_description_)
{
if (elements_actions.size() != required_sort_description.size())
throw Exception("Sizes of sort description and actions are mismatched", ErrorCodes::LOGICAL_ERROR);
/// Do not analyze joined columns.
/// They may have aliases and come to descriprion as is.
/// We can mismatch them with order key columns at stage of fetching columns.
for (const auto & elem : syntax_result->array_join_result_to_source)
forbidden_columns.insert(elem.first);
}
InputSortingInfoPtr ReadInOrderOptimizer::getInputOrder(const StoragePtr & storage) const
{
const MergeTreeData * merge_tree = dynamic_cast<const MergeTreeData *>(storage.get());
if (!merge_tree || !merge_tree->hasSortingKey())
return {};
SortDescription order_key_prefix_descr;
int read_direction = required_sort_description.at(0).direction;
const auto & sorting_key_columns = merge_tree->getSortingKeyColumns();
size_t prefix_size = std::min(required_sort_description.size(), sorting_key_columns.size());
for (size_t i = 0; i < prefix_size; ++i)
{
if (forbidden_columns.count(required_sort_description[i].column_name))
break;
/// Optimize in case of exact match with order key element
/// or in some simple cases when order key element is wrapped into monotonic function.
int current_direction = required_sort_description[i].direction;
if (required_sort_description[i].column_name == sorting_key_columns[i] && current_direction == read_direction)
order_key_prefix_descr.push_back(required_sort_description[i]);
else
{
/// Allow only one simple monotonic functions with one argument
bool found_function = false;
for (const auto & action : elements_actions[i]->getActions())
{
if (action.type != ExpressionAction::APPLY_FUNCTION)
continue;
if (found_function)
{
current_direction = 0;
break;
}
else
found_function = true;
if (action.argument_names.size() != 1 || action.argument_names.at(0) != sorting_key_columns[i])
{
current_direction = 0;
break;
}
const auto & func = *action.function_base;
if (!func.hasInformationAboutMonotonicity())
{
current_direction = 0;
break;
}
auto monotonicity = func.getMonotonicityForRange(*func.getArgumentTypes().at(0), {}, {});
if (!monotonicity.is_monotonic)
{
current_direction = 0;
break;
}
else if (!monotonicity.is_positive)
current_direction *= -1;
}
if (!found_function)
current_direction = 0;
if (!current_direction || (i > 0 && current_direction != read_direction))
break;
if (i == 0)
read_direction = current_direction;
order_key_prefix_descr.push_back(required_sort_description[i]);
}
}
if (order_key_prefix_descr.empty())
return {};
return std::make_shared<InputSortingInfo>(std::move(order_key_prefix_descr), read_direction);
}
}

View File

@ -0,0 +1,32 @@
#pragma once
#include <Core/SortDescription.h>
#include <Storages/MergeTree/MergeTreeData.h>
#include <Storages/SelectQueryInfo.h>
namespace DB
{
/** Helper class, that can analyze MergeTree order key
* and required sort description to get their
* common prefix, which is needed for
* performing reading in order of PK.
*/
class ReadInOrderOptimizer
{
public:
ReadInOrderOptimizer(
const ManyExpressionActions & elements_actions,
const SortDescription & required_sort_description,
const SyntaxAnalyzerResultPtr & syntax_result);
InputSortingInfoPtr getInputOrder(const StoragePtr & storage) const;
private:
/// Actions for every element of order expression to analyze functions for monotonicicy
ManyExpressionActions elements_actions;
NameSet forbidden_columns;
SortDescription required_sort_description;
};
}

View File

@ -41,15 +41,25 @@ struct InputSortingInfo
InputSortingInfo(const SortDescription & order_key_prefix_descr_, int direction_)
: order_key_prefix_descr(order_key_prefix_descr_), direction(direction_) {}
bool operator ==(const InputSortingInfo & other) const
{
return order_key_prefix_descr == other.order_key_prefix_descr && direction == other.direction;
}
bool operator !=(const InputSortingInfo & other) const { return !(*this == other); }
};
using PrewhereInfoPtr = std::shared_ptr<PrewhereInfo>;
using FilterInfoPtr = std::shared_ptr<FilterInfo>;
using InputSortingInfoPtr = std::shared_ptr<InputSortingInfo>;
using InputSortingInfoPtr = std::shared_ptr<const InputSortingInfo>;
struct SyntaxAnalyzerResult;
using SyntaxAnalyzerResultPtr = std::shared_ptr<const SyntaxAnalyzerResult>;
class ReadInOrderOptimizer;
using ReadInOrderOptimizerPtr = std::shared_ptr<const ReadInOrderOptimizer>;
/** Query along with some additional data,
* that can be used during query processing
* inside storage engines.
@ -62,7 +72,9 @@ struct SelectQueryInfo
PrewhereInfoPtr prewhere_info;
InputSortingInfoPtr input_sorting_info;
ReadInOrderOptimizerPtr order_by_optimizer;
/// We can modify it while reading from storage
mutable InputSortingInfoPtr input_sorting_info;
/// Prepared sets are used for indices by storage engine.
/// Example: x IN (1, 2, 3)

View File

@ -165,6 +165,9 @@ BlockInputStreams StorageBuffer::read(
if (dst_has_same_structure)
{
if (query_info.order_by_optimizer)
query_info.input_sorting_info = query_info.order_by_optimizer->getInputOrder(destination);
/// The destination table has the same structure of the requested columns and we can simply read blocks from there.
streams_from_dst = destination->read(column_names, query_info, context, processed_stage, max_block_size, num_streams);
}

View File

@ -15,6 +15,7 @@
#include <DataStreams/IBlockOutputStream.h>
#include <Storages/StorageFactory.h>
#include <Storages/ReadInOrderOptimizer.h>
#include <Common/typeid_cast.h>
@ -200,6 +201,9 @@ BlockInputStreams StorageMaterializedView::read(
{
auto storage = getTargetTable();
auto lock = storage->lockStructureForShare(false, context.getCurrentQueryId());
if (query_info.order_by_optimizer)
query_info.input_sorting_info = query_info.order_by_optimizer->getInputOrder(storage);
auto streams = storage->read(column_names, query_info, context, processed_stage, max_block_size, num_streams);
for (auto & stream : streams)
stream->addTableLock(lock);

View File

@ -209,6 +209,24 @@ BlockInputStreams StorageMerge::read(
num_streams *= num_streams_multiplier;
size_t remaining_streams = num_streams;
InputSortingInfoPtr input_sorting_info;
if (query_info.order_by_optimizer)
{
for (auto it = selected_tables.begin(); it != selected_tables.end(); ++it)
{
auto current_info = query_info.order_by_optimizer->getInputOrder(it->first);
if (it == selected_tables.begin())
input_sorting_info = current_info;
else if (!current_info || (input_sorting_info && *current_info != *input_sorting_info))
input_sorting_info.reset();
if (!input_sorting_info)
break;
}
query_info.input_sorting_info = input_sorting_info;
}
for (auto it = selected_tables.begin(); it != selected_tables.end(); ++it)
{
size_t current_need_streams = tables_count >= num_streams ? 1 : (num_streams / tables_count);

View File

@ -56,12 +56,12 @@ def get_and_check_metrics():
def test_prometheus_endpoint(start_cluster):
metrics_dict = get_and_check_metrics()
assert metrics_dict['ClickHouseProfileEventsQuery'] >= 0
prev_query_count = metrics_dict['ClickHouseProfileEventsQuery']
assert metrics_dict['ClickHouseProfileEvents_Query'] >= 0
prev_query_count = metrics_dict['ClickHouseProfileEvents_Query']
resp = node.query("SELECT 1")
resp = node.query("SELECT 2")
resp = node.query("SELECT 3")
metrics_dict = get_and_check_metrics()
assert metrics_dict['ClickHouseProfileEventsQuery'] >= prev_query_count + 3
assert metrics_dict['ClickHouseProfileEvents_Query'] >= prev_query_count + 3

View File

@ -13,15 +13,19 @@
70
2019-01-01 50 [1,2,3,4,5,6,7,8,9,10,11,12,13,14,15,16,17,18,19,20,21,22,23,24,25,26,27,28,29,30,31,32,33,34,35,36,37,38,39,40,41,42,43,44,45,46,47,48,49,50]
2019-01-02 60 [11,12,13,14,15,16,17,18,19,20,21,22,23,24,25,26,27,28,29,30,31,32,33,34,35,36,37,38,39,40,41,42,43,44,45,46,47,48,49,50,51,52,53,54,55,56,57,58,59,60,61,62,63,64,65,66,67,68,69,70]
2019-01-03 10 [1,2,3,4,5,6,7,8,9,10]
60 50 70 40 20 30
60 50 70 40 20 30
90
90
100
100
20
90
100
20
[1,2,3]
[1,2,3]
2019-01-01 50
2019-01-02 60
2019-01-03 10
1
1
1

View File

@ -15,6 +15,7 @@ DROP TABLE IF EXISTS bitmap_test;
CREATE TABLE bitmap_test(pickup_date Date, city_id UInt32, uid UInt32)ENGINE = Memory;
INSERT INTO bitmap_test SELECT '2019-01-01', 1, number FROM numbers(1,50);
INSERT INTO bitmap_test SELECT '2019-01-02', 1, number FROM numbers(11,60);
INSERT INTO bitmap_test SELECT '2019-01-03', 2, number FROM numbers(1,10);
SELECT groupBitmap( uid ) AS user_num FROM bitmap_test;
@ -65,6 +66,9 @@ SELECT count(*) FROM bitmap_test WHERE bitmapContains((SELECT groupBitmapState(u
SELECT count(*) FROM bitmap_test WHERE 0 = bitmapContains((SELECT groupBitmapState(uid) FROM bitmap_test WHERE pickup_date = '2019-01-01'), uid);
-- PR#8082
SELECT bitmapToArray(bitmapAnd(groupBitmapState(uid), bitmapBuild(CAST([1, 2, 3], 'Array(UInt32)')))) FROM bitmap_test GROUP BY city_id;
-- bitmap state test
DROP TABLE IF EXISTS bitmap_state_test;
CREATE TABLE bitmap_state_test

View File

@ -0,0 +1,41 @@
---StorageMerge---
0
0
0
0
0
0 0
0 10000
0 1000000
0 100000000
0 1001089600
0 1002355600
0 1003622400
0 100400400
0 1004890000
0 1006158400
OK
---StorageBuffer---
1 0
1 1000000
1 1000000000
1 1000000000000
1 100026577288000
1 100155921984000
1 10021812416000
1 100285378136000
1 100414945792000
1 10049728312000
OK
---MaterializedView---
0 0
0 10000
0 1000000
0 100000000
14 1000267129
28 1000709956
0 1001089600
14 100140049
14 1001532609
28 1001975716
OK

View File

@ -0,0 +1,62 @@
#!/usr/bin/env bash
CURDIR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)
. $CURDIR/../shell_config.sh
set -e
$CLICKHOUSE_CLIENT -q "DROP TABLE IF EXISTS s1"
$CLICKHOUSE_CLIENT -q "DROP TABLE IF EXISTS s2"
$CLICKHOUSE_CLIENT -q "DROP TABLE IF EXISTS m"
$CLICKHOUSE_CLIENT -q "DROP TABLE IF EXISTS buf"
$CLICKHOUSE_CLIENT -q "DROP TABLE IF EXISTS mv"
$CLICKHOUSE_CLIENT -q "CREATE TABLE s1 (a UInt32, s String) ENGINE = MergeTree ORDER BY a PARTITION BY a % 3"
$CLICKHOUSE_CLIENT -q "CREATE TABLE s2 (a UInt32, s String) ENGINE = MergeTree ORDER BY a PARTITION BY a % 3"
$CLICKHOUSE_CLIENT -q "CREATE TABLE m (a UInt32, s String) engine = Merge(currentDatabase(), 's[1,2]')"
$CLICKHOUSE_CLIENT -q "INSERT INTO s1 select (number % 20) * 2 as n, toString(number * number) from numbers(100000)"
$CLICKHOUSE_CLIENT -q "INSERT INTO s2 select (number % 20) * 2 + 1 as n, toString(number * number * number) from numbers(100000)"
$CLICKHOUSE_CLIENT -q "SELECT '---StorageMerge---'"
$CLICKHOUSE_CLIENT -q "SELECT a FROM m ORDER BY a LIMIT 5"
$CLICKHOUSE_CLIENT -q "SELECT a, s FROM m ORDER BY a, s LIMIT 10"
# Not a single .sql test with max_rows_to_read because it doesn't work with Merge storage
rows_read=`$CLICKHOUSE_CLIENT -q "SELECT a FROM m ORDER BY a LIMIT 10 FORMAT JSON" --max_threads=1 --max_block_size=20 | grep "rows_read" | sed 's/[^0-9]*//g'`
# Expected number of read rows with a bit margin
if [[ $rows_read -lt 500 ]]
then echo "OK"
else
echo "FAIL"
fi
$CLICKHOUSE_CLIENT -q "SELECT '---StorageBuffer---'"
$CLICKHOUSE_CLIENT -q "CREATE TABLE buf (a UInt32, s String) engine = Buffer(currentDatabase(), s2, 16, 10, 100, 10000, 1000000, 10000000, 100000000)"
$CLICKHOUSE_CLIENT -q "SELECT a, s FROM buf ORDER BY a, s LIMIT 10"
rows_read=`$CLICKHOUSE_CLIENT -q "SELECT a FROM buf ORDER BY a LIMIT 10 FORMAT JSON" --max_threads=1 --max_block_size=20 | grep "rows_read" | sed 's/[^0-9]*//g'`
# Expected number of read rows with a bit margin
if [[ $rows_read -lt 500 ]]
then echo "OK"
else
echo "FAIL"
fi
$CLICKHOUSE_CLIENT -q "SELECT '---MaterializedView---'"
$CLICKHOUSE_CLIENT -q "CREATE MATERIALIZED VIEW mv (a UInt32, s String) engine = MergeTree ORDER BY s POPULATE AS SELECT a, s FROM s1 WHERE a % 7 = 0"
$CLICKHOUSE_CLIENT -q "SELECT a, s FROM mv ORDER BY s LIMIT 10"
rows_read=`$CLICKHOUSE_CLIENT -q "SELECT a, s FROM mv ORDER BY s LIMIT 10 FORMAT JSON" --max_threads=1 --max_block_size=20 | grep "rows_read" | sed 's/[^0-9]*//g'`
if [[ $rows_read -lt 500 ]]
then echo "OK"
else
echo "FAIL"
fi
$CLICKHOUSE_CLIENT -q "DROP TABLE IF EXISTS s1"
$CLICKHOUSE_CLIENT -q "DROP TABLE IF EXISTS s2"
$CLICKHOUSE_CLIENT -q "DROP TABLE IF EXISTS m"
$CLICKHOUSE_CLIENT -q "DROP TABLE IF EXISTS buf"
$CLICKHOUSE_CLIENT -q "DROP TABLE IF EXISTS mv"