Merge branch 'master' into use-dag-in-key-condition

This commit is contained in:
Nikolai Kochetov 2022-07-20 17:38:33 +02:00 committed by GitHub
commit 4e8cd70b1d
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
61 changed files with 1153 additions and 184 deletions

View File

@ -362,6 +362,8 @@ else
# FIXME Not sure if it's expected, but some tests from BC check may not be finished yet when we restarting server.
# Let's just ignore all errors from queries ("} <Error> TCPHandler: Code:", "} <Error> executeQuery: Code:")
# FIXME https://github.com/ClickHouse/ClickHouse/issues/39197 ("Missing columns: 'v3' while processing query: 'v3, k, v1, v2, p'")
# NOTE Incompatibility was introduced in https://github.com/ClickHouse/ClickHouse/pull/39263, it's expected
# ("This engine is deprecated and is not supported in transactions", "[Queue = DB::MergeMutateRuntimeQueue]: Code: 235. DB::Exception: Part")
echo "Check for Error messages in server log:"
zgrep -Fav -e "Code: 236. DB::Exception: Cancelled merging parts" \
-e "Code: 236. DB::Exception: Cancelled mutating parts" \
@ -389,6 +391,8 @@ else
-e "} <Error> TCPHandler: Code:" \
-e "} <Error> executeQuery: Code:" \
-e "Missing columns: 'v3' while processing query: 'v3, k, v1, v2, p'" \
-e "This engine is deprecated and is not supported in transactions" \
-e "[Queue = DB::MergeMutateRuntimeQueue]: Code: 235. DB::Exception: Part" \
/var/log/clickhouse-server/clickhouse-server.backward.clean.log | zgrep -Fa "<Error>" > /test_output/bc_check_error_messages.txt \
&& echo -e 'Backward compatibility check: Error message in clickhouse-server.log (see bc_check_error_messages.txt)\tFAIL' >> /test_output/test_results.tsv \
|| echo -e 'Backward compatibility check: No Error messages in clickhouse-server.log\tOK' >> /test_output/test_results.tsv

View File

@ -46,6 +46,9 @@ def get_options(i, backward_compatibility_check):
if i == 13:
client_options.append("memory_tracker_fault_probability=0.001")
if i % 2 == 1 and not backward_compatibility_check:
client_options.append("group_by_use_nulls=1")
if client_options:
options.append(" --client-option " + " ".join(client_options))

View File

@ -1632,6 +1632,8 @@ kafka_topic_list = 'topic1',
kafka_group_name = 'group1',
kafka_format = 'AvroConfluent';
-- for debug purposes you can set format_avro_schema_registry_url in a session.
-- this way cannot be used in production
SET format_avro_schema_registry_url = 'http://schema-registry';
SELECT * FROM topic1_stream;

View File

@ -2626,7 +2626,7 @@ Possible values:
- Any positive integer.
- 0 - Disabled (infinite timeout).
Default value: 1800.
Default value: 180.
## http_receive_timeout {#http_receive_timeout}
@ -2637,7 +2637,7 @@ Possible values:
- Any positive integer.
- 0 - Disabled (infinite timeout).
Default value: 1800.
Default value: 180.
## check_query_single_value_result {#check_query_single_value_result}

View File

@ -793,4 +793,18 @@ ColumnPtr makeNullable(const ColumnPtr & column)
return ColumnNullable::create(column, ColumnUInt8::create(column->size(), 0));
}
ColumnPtr makeNullableSafe(const ColumnPtr & column)
{
if (isColumnNullable(*column))
return column;
if (isColumnConst(*column))
return ColumnConst::create(makeNullableSafe(assert_cast<const ColumnConst &>(*column).getDataColumnPtr()), column->size());
if (column->canBeInsideNullable())
return makeNullable(column);
return column;
}
}

View File

@ -223,5 +223,6 @@ private:
};
ColumnPtr makeNullable(const ColumnPtr & column);
ColumnPtr makeNullableSafe(const ColumnPtr & column);
}

View File

@ -1,9 +1,7 @@
#include <sys/types.h>
#include <sys/wait.h>
#include <fcntl.h>
#include <dlfcn.h>
#include <unistd.h>
#include <ctime>
#include <csignal>
#include <Common/logger_useful.h>
@ -13,6 +11,7 @@
#include <Common/PipeFDs.h>
#include <IO/WriteHelpers.h>
#include <IO/Operators.h>
#include <Common/waitForPid.h>
namespace
@ -94,53 +93,15 @@ ShellCommand::~ShellCommand()
bool ShellCommand::tryWaitProcessWithTimeout(size_t timeout_in_seconds)
{
int status = 0;
LOG_TRACE(getLogger(), "Try wait for shell command pid {} with timeout {}", pid, timeout_in_seconds);
wait_called = true;
struct timespec interval {.tv_sec = 1, .tv_nsec = 0};
in.close();
out.close();
err.close();
if (timeout_in_seconds == 0)
{
/// If there is no timeout before signal try to waitpid 1 time without block so we can avoid sending
/// signal if process is already normally terminated.
int waitpid_res = waitpid(pid, &status, WNOHANG);
bool process_terminated_normally = (waitpid_res == pid);
return process_terminated_normally;
}
/// If timeout is positive try waitpid without block in loop until
/// process is normally terminated or waitpid return error
while (timeout_in_seconds != 0)
{
int waitpid_res = waitpid(pid, &status, WNOHANG);
bool process_terminated_normally = (waitpid_res == pid);
if (process_terminated_normally)
{
return true;
}
else if (waitpid_res == 0)
{
--timeout_in_seconds;
nanosleep(&interval, nullptr);
continue;
}
else if (waitpid_res == -1 && errno != EINTR)
{
return false;
}
}
return false;
return waitForPid(pid, timeout_in_seconds);
}
void ShellCommand::logCommand(const char * filename, char * const argv[])

192
src/Common/waitForPid.cpp Normal file
View File

@ -0,0 +1,192 @@
#include <Common/waitForPid.h>
#include <Common/VersionNumber.h>
#include <Poco/Environment.h>
#include <Common/Stopwatch.h>
#include <fcntl.h>
#include <sys/wait.h>
#include <unistd.h>
#pragma GCC diagnostic push
#pragma GCC diagnostic ignored "-Wgnu-statement-expression"
#define HANDLE_EINTR(x) ({ \
decltype(x) eintr_wrapper_result; \
do { \
eintr_wrapper_result = (x); \
} while (eintr_wrapper_result == -1 && errno == EINTR); \
eintr_wrapper_result; \
})
#if defined(OS_LINUX)
#include <poll.h>
#include <string>
#if !defined(__NR_pidfd_open)
#if defined(__x86_64__)
#define SYS_pidfd_open 434
#elif defined(__aarch64__)
#define SYS_pidfd_open 434
#elif defined(__ppc64__)
#define SYS_pidfd_open 434
#elif defined(__riscv)
#define SYS_pidfd_open 434
#else
#error "Unsupported architecture"
#endif
#else
#define SYS_pidfd_open __NR_pidfd_open
#endif
namespace DB
{
static int syscall_pidfd_open(pid_t pid)
{
// pidfd_open cannot be interrupted, no EINTR handling
return syscall(SYS_pidfd_open, pid, 0);
}
static int dir_pidfd_open(pid_t pid)
{
std::string path = "/proc/" + std::to_string(pid);
return HANDLE_EINTR(open(path.c_str(), O_DIRECTORY));
}
static bool supportsPidFdOpen()
{
VersionNumber pidfd_open_minimal_version(5, 3, 0);
VersionNumber linux_version(Poco::Environment::osVersion());
return linux_version >= pidfd_open_minimal_version;
}
static int pidFdOpen(pid_t pid)
{
// use pidfd_open or just plain old /proc/[pid] open for Linux
if (supportsPidFdOpen())
{
return syscall_pidfd_open(pid);
}
else
{
return dir_pidfd_open(pid);
}
}
static int pollPid(pid_t pid, int timeout_in_ms)
{
struct pollfd pollfd;
int pid_fd = pidFdOpen(pid);
if (pid_fd == -1)
{
return false;
}
pollfd.fd = pid_fd;
pollfd.events = POLLIN;
int ready = poll(&pollfd, 1, timeout_in_ms);
int save_errno = errno;
close(pid_fd);
errno = save_errno;
return ready;
}
#elif defined(OS_DARWIN) || defined(OS_FREEBSD)
#include <sys/event.h>
#include <err.h>
namespace DB
{
static int pollPid(pid_t pid, int timeout_in_ms)
{
int status = 0;
int kq = HANDLE_EINTR(kqueue());
if (kq == -1)
{
return false;
}
struct kevent change = {.ident = NULL};
EV_SET(&change, pid, EVFILT_PROC, EV_ADD, NOTE_EXIT, 0, NULL);
int result = HANDLE_EINTR(kevent(kq, &change, 1, NULL, 0, NULL));
if (result == -1)
{
if (errno != ESRCH)
{
return false;
}
// check if pid already died while we called kevent()
if (waitpid(pid, &status, WNOHANG) == pid)
{
return true;
}
return false;
}
struct kevent event = {.ident = NULL};
struct timespec remaining_timespec = {.tv_sec = timeout_in_ms / 1000, .tv_nsec = (timeout_in_ms % 1000) * 1000000};
int ready = kevent(kq, nullptr, 0, &event, 1, &remaining_timespec);
int save_errno = errno;
close(kq);
errno = save_errno;
return ready;
}
#else
#error "Unsupported OS type"
#endif
bool waitForPid(pid_t pid, size_t timeout_in_seconds)
{
int status = 0;
Stopwatch watch;
if (timeout_in_seconds == 0)
{
/// If there is no timeout before signal try to waitpid 1 time without block so we can avoid sending
/// signal if process is already normally terminated.
int waitpid_res = waitpid(pid, &status, WNOHANG);
bool process_terminated_normally = (waitpid_res == pid);
return process_terminated_normally;
}
/// If timeout is positive try waitpid without block in loop until
/// process is normally terminated or waitpid return error
int timeout_in_ms = timeout_in_seconds * 1000;
while (timeout_in_ms > 0)
{
int waitpid_res = waitpid(pid, &status, WNOHANG);
bool process_terminated_normally = (waitpid_res == pid);
if (process_terminated_normally)
{
return true;
}
else if (waitpid_res == 0)
{
watch.restart();
int ready = pollPid(pid, timeout_in_ms);
if (ready <= 0)
{
if (errno == EINTR || errno == EAGAIN)
{
timeout_in_ms -= watch.elapsedMilliseconds();
}
else
{
return false;
}
}
continue;
}
else if (waitpid_res == -1 && errno != EINTR)
{
return false;
}
}
return false;
}
}
#pragma GCC diagnostic pop

12
src/Common/waitForPid.h Normal file
View File

@ -0,0 +1,12 @@
#pragma once
#include <sys/types.h>
namespace DB
{
/*
* Waits for a specific pid with timeout, using modern Linux and OSX facilities
* Returns `true` if process terminated successfully or `false` otherwise
*/
bool waitForPid(pid_t pid, size_t timeout_in_seconds);
}

View File

@ -132,6 +132,8 @@ static constexpr UInt64 operator""_GiB(unsigned long long value)
M(UInt64, aggregation_memory_efficient_merge_threads, 0, "Number of threads to use for merge intermediate aggregation results in memory efficient mode. When bigger, then more memory is consumed. 0 means - same as 'max_threads'.", 0) \
M(Bool, enable_positional_arguments, true, "Enable positional arguments in ORDER BY, GROUP BY and LIMIT BY", 0) \
\
M(Bool, group_by_use_nulls, false, "Treat columns mentioned in ROLLUP, CUBE or GROUPING SETS as Nullable", 0) \
\
M(UInt64, max_parallel_replicas, 1, "The maximum number of replicas of each shard used when the query is executed. For consistency (to get different parts of the same partition), this option only works for the specified sampling key. The lag of the replicas is not controlled.", 0) \
M(UInt64, parallel_replicas_count, 0, "", 0) \
M(UInt64, parallel_replica_offset, 0, "", 0) \
@ -759,7 +761,7 @@ static constexpr UInt64 operator""_GiB(unsigned long long value)
M(Bool, output_format_pretty_row_numbers, false, "Add row numbers before each row for pretty output format", 0) \
M(Bool, insert_distributed_one_random_shard, false, "If setting is enabled, inserting into distributed table will choose a random shard to write when there is no sharding key", 0) \
\
M(UInt64, cross_to_inner_join_rewrite, 1, "Use inner join instead of comma/cross join if possible. Possible values: 0 - no rewrite, 1 - apply if possible, 2 - force rewrite all cross joins", 0) \
M(UInt64, cross_to_inner_join_rewrite, 2, "Use inner join instead of comma/cross join if possible. Possible values: 0 - no rewrite, 1 - apply if possible for comma/cross, 2 - force rewrite all comma joins, cross - if possible", 0) \
\
M(Bool, output_format_arrow_low_cardinality_as_dictionary, false, "Enable output LowCardinality type as Dictionary Arrow type", 0) \
M(Bool, output_format_arrow_string_as_string, false, "Use Arrow String type instead of Binary for String columns", 0) \

View File

@ -85,6 +85,13 @@ DataTypePtr makeNullable(const DataTypePtr & type)
return std::make_shared<DataTypeNullable>(type);
}
DataTypePtr makeNullableSafe(const DataTypePtr & type)
{
if (type->canBeInsideNullable())
return makeNullable(type);
return type;
}
DataTypePtr removeNullable(const DataTypePtr & type)
{
if (type->isNullable())

View File

@ -51,6 +51,7 @@ private:
DataTypePtr makeNullable(const DataTypePtr & type);
DataTypePtr makeNullableSafe(const DataTypePtr & type);
DataTypePtr removeNullable(const DataTypePtr & type);
}

View File

@ -532,6 +532,12 @@ inline bool isBool(const DataTypePtr & data_type)
return data_type->getName() == "Bool";
}
inline bool isAggregateFunction(const DataTypePtr & data_type)
{
WhichDataType which(data_type);
return which.isAggregateFunction();
}
template <typename DataType> constexpr bool IsDataTypeDecimal = false;
template <typename DataType> constexpr bool IsDataTypeNumber = false;
template <typename DataType> constexpr bool IsDataTypeDateOrDateTime = false;

View File

@ -66,7 +66,7 @@ ColumnsDescription readSchemaFromFormat(
}
catch (const DB::Exception & e)
{
throw Exception(ErrorCodes::CANNOT_EXTRACT_TABLE_STRUCTURE, "Cannot extract table structure from {} format file. Error: {}", format_name, e.message());
throw Exception(ErrorCodes::CANNOT_EXTRACT_TABLE_STRUCTURE, "Cannot extract table structure from {} format file. Error: {}. You can specify the structure manually", format_name, e.message());
}
}
else if (FormatFactory::instance().checkIfFormatHasSchemaReader(format_name))
@ -75,16 +75,29 @@ ColumnsDescription readSchemaFromFormat(
SchemaReaderPtr schema_reader;
size_t max_rows_to_read = format_settings ? format_settings->max_rows_to_read_for_schema_inference : context->getSettingsRef().input_format_max_rows_to_read_for_schema_inference;
size_t iterations = 0;
while ((buf = read_buffer_iterator()))
while (true)
{
bool is_eof = false;
try
{
buf = read_buffer_iterator();
if (!buf)
break;
is_eof = buf->eof();
}
catch (...)
{
auto exception_message = getCurrentExceptionMessage(false);
throw Exception(ErrorCodes::CANNOT_EXTRACT_TABLE_STRUCTURE, "Cannot extract table structure from {} format file: {}. You can specify the structure manually", format_name, exception_message);
}
++iterations;
if (buf->eof())
if (is_eof)
{
auto exception_message = fmt::format("Cannot extract table structure from {} format file, file is empty", format_name);
if (!retry)
throw Exception(ErrorCodes::CANNOT_EXTRACT_TABLE_STRUCTURE, exception_message);
throw Exception(ErrorCodes::CANNOT_EXTRACT_TABLE_STRUCTURE, "{}. You can specify the structure manually", exception_message);
exception_messages += "\n" + exception_message;
continue;
@ -118,14 +131,14 @@ ColumnsDescription readSchemaFromFormat(
}
if (!retry || !isRetryableSchemaInferenceError(getCurrentExceptionCode()))
throw Exception(ErrorCodes::CANNOT_EXTRACT_TABLE_STRUCTURE, "Cannot extract table structure from {} format file. Error: {}", format_name, exception_message);
throw Exception(ErrorCodes::CANNOT_EXTRACT_TABLE_STRUCTURE, "Cannot extract table structure from {} format file. Error: {}. You can specify the structure manually", format_name, exception_message);
exception_messages += "\n" + exception_message;
}
}
if (names_and_types.empty())
throw Exception(ErrorCodes::CANNOT_EXTRACT_TABLE_STRUCTURE, "All attempts to extract table structure from files failed. Errors:{}", exception_messages);
throw Exception(ErrorCodes::CANNOT_EXTRACT_TABLE_STRUCTURE, "All attempts to extract table structure from files failed. Errors:{}\nYou can specify the structure manually", exception_messages);
/// If we have "INSERT SELECT" query then try to order
/// columns as they are ordered in table schema for formats

View File

@ -989,9 +989,15 @@ void AsynchronousMetrics::update(std::chrono::system_clock::time_point update_ti
if (s.rfind("processor", 0) == 0)
{
/// s390x example: processor 0: version = FF, identification = 039C88, machine = 3906
/// non s390x example: processor : 0
if (auto colon = s.find_first_of(':'))
{
#ifdef __s390x__
core_id = std::stoi(s.substr(10)); /// 10: length of "processor" plus 1
#else
core_id = std::stoi(s.substr(colon + 2));
#endif
}
}
else if (s.rfind("cpu MHz", 0) == 0)

View File

@ -39,7 +39,10 @@ struct JoinedElement
: element(table_element)
{
if (element.table_join)
{
join = element.table_join->as<ASTTableJoin>();
original_kind = join->kind;
}
}
void checkTableName(const DatabaseAndTableWithAlias & table, const String & current_database) const
@ -61,6 +64,8 @@ struct JoinedElement
join->kind = ASTTableJoin::Kind::Cross;
}
ASTTableJoin::Kind getOriginalKind() const { return original_kind; }
bool rewriteCrossToInner(ASTPtr on_expression)
{
if (join->kind != ASTTableJoin::Kind::Cross)
@ -83,6 +88,8 @@ struct JoinedElement
private:
const ASTTablesInSelectQueryElement & element;
ASTTableJoin * join = nullptr;
ASTTableJoin::Kind original_kind;
};
bool isAllowedToRewriteCrossJoin(const ASTPtr & node, const Aliases & aliases)
@ -251,10 +258,17 @@ void CrossToInnerJoinMatcher::visit(ASTSelectQuery & select, ASTPtr &, Data & da
}
}
if (data.cross_to_inner_join_rewrite > 1 && !rewritten)
if (joined.getOriginalKind() == ASTTableJoin::Kind::Comma &&
data.cross_to_inner_join_rewrite > 1 &&
!rewritten)
{
throw Exception(ErrorCodes::INCORRECT_QUERY, "Failed to rewrite '{} WHERE {}' to INNER JOIN",
query_before, queryToString(select.where()));
throw Exception(
ErrorCodes::INCORRECT_QUERY,
"Failed to rewrite comma join to INNER. "
"Please, try to simplify WHERE section "
"or set the setting `cross_to_inner_join_rewrite` to 1 to allow slow CROSS JOIN for this case "
"(cannot rewrite '{} WHERE {}' to INNER JOIN)",
query_before, queryToString(select.where()));
}
}
}

View File

@ -45,6 +45,9 @@
#include <Common/typeid_cast.h>
#include <Common/StringUtils/StringUtils.h>
#include <Columns/ColumnNullable.h>
#include <Core/ColumnsWithTypeAndName.h>
#include <DataTypes/IDataType.h>
#include <Core/SettingsEnums.h>
#include <Core/ColumnNumbers.h>
#include <Core/Names.h>
@ -345,6 +348,7 @@ void ExpressionAnalyzer::analyzeAggregation(ActionsDAGPtr & temp_actions)
group_by_kind = GroupByKind::GROUPING_SETS;
else
group_by_kind = GroupByKind::ORDINARY;
bool use_nulls = group_by_kind != GroupByKind::ORDINARY && getContext()->getSettingsRef().group_by_use_nulls;
/// For GROUPING SETS with multiple groups we always add virtual __grouping_set column
/// With set number, which is used as an additional key at the stage of merging aggregating data.
@ -399,7 +403,7 @@ void ExpressionAnalyzer::analyzeAggregation(ActionsDAGPtr & temp_actions)
}
}
NameAndTypePair key{column_name, node->result_type};
NameAndTypePair key{column_name, use_nulls ? makeNullableSafe(node->result_type) : node->result_type };
grouping_set_list.push_back(key);
@ -453,7 +457,7 @@ void ExpressionAnalyzer::analyzeAggregation(ActionsDAGPtr & temp_actions)
}
}
NameAndTypePair key{column_name, node->result_type};
NameAndTypePair key = NameAndTypePair{ column_name, use_nulls ? makeNullableSafe(node->result_type) : node->result_type };
/// Aggregation keys are uniqued.
if (!unique_keys.contains(key.name))
@ -1489,6 +1493,28 @@ void SelectQueryExpressionAnalyzer::appendExpressionsAfterWindowFunctions(Expres
}
}
void SelectQueryExpressionAnalyzer::appendGroupByModifiers(ActionsDAGPtr & before_aggregation, ExpressionActionsChain & chain, bool /* only_types */)
{
const auto * select_query = getAggregatingQuery();
if (!select_query->groupBy() || !(select_query->group_by_with_rollup || select_query->group_by_with_cube))
return;
auto source_columns = before_aggregation->getResultColumns();
ColumnsWithTypeAndName result_columns;
for (const auto & source_column : source_columns)
{
if (source_column.type->canBeInsideNullable())
result_columns.emplace_back(makeNullableSafe(source_column.type), source_column.name);
else
result_columns.push_back(source_column);
}
ExpressionActionsChain::Step & step = chain.lastStep(before_aggregation->getNamesAndTypesList());
step.actions() = ActionsDAG::makeConvertingActions(source_columns, result_columns, ActionsDAG::MatchColumnsMode::Position);
}
void SelectQueryExpressionAnalyzer::appendSelectSkipWindowExpressions(ExpressionActionsChain::Step & step, ASTPtr const & node)
{
if (auto * function = node->as<ASTFunction>())
@ -1956,6 +1982,9 @@ ExpressionAnalysisResult::ExpressionAnalysisResult(
query_analyzer.appendAggregateFunctionsArguments(chain, only_types || !first_stage);
before_aggregation = chain.getLastActions();
if (settings.group_by_use_nulls)
query_analyzer.appendGroupByModifiers(before_aggregation, chain, only_types);
finalize_chain(chain);
if (query_analyzer.appendHaving(chain, only_types || !second_stage))

View File

@ -412,6 +412,8 @@ private:
void appendExpressionsAfterWindowFunctions(ExpressionActionsChain & chain, bool only_types);
void appendSelectSkipWindowExpressions(ExpressionActionsChain::Step & step, ASTPtr const & node);
void appendGroupByModifiers(ActionsDAGPtr & before_aggregation, ExpressionActionsChain & chain, bool only_types);
/// After aggregation:
bool appendHaving(ExpressionActionsChain & chain, bool only_types);
/// appendSelect

View File

@ -786,8 +786,16 @@ Block InterpreterSelectQuery::getSampleBlockImpl()
if (analysis_result.use_grouping_set_key)
res.insert({ nullptr, std::make_shared<DataTypeUInt64>(), "__grouping_set" });
for (const auto & key : query_analyzer->aggregationKeys())
res.insert({nullptr, header.getByName(key.name).type, key.name});
if (context->getSettingsRef().group_by_use_nulls && analysis_result.use_grouping_set_key)
{
for (const auto & key : query_analyzer->aggregationKeys())
res.insert({nullptr, makeNullableSafe(header.getByName(key.name).type), key.name});
}
else
{
for (const auto & key : query_analyzer->aggregationKeys())
res.insert({nullptr, header.getByName(key.name).type, key.name});
}
for (const auto & aggregate : query_analyzer->aggregates())
{
@ -2326,6 +2334,7 @@ void InterpreterSelectQuery::executeAggregation(QueryPlan & query_plan, const Ac
merge_threads,
temporary_data_merge_threads,
storage_has_evenly_distributed_read,
settings.group_by_use_nulls,
std::move(group_by_info),
std::move(group_by_sort_description),
should_produce_results_in_order_of_bucket_number);
@ -2402,9 +2411,9 @@ void InterpreterSelectQuery::executeRollupOrCube(QueryPlan & query_plan, Modific
QueryPlanStepPtr step;
if (modificator == Modificator::ROLLUP)
step = std::make_unique<RollupStep>(query_plan.getCurrentDataStream(), std::move(params), final);
step = std::make_unique<RollupStep>(query_plan.getCurrentDataStream(), std::move(params), final, settings.group_by_use_nulls);
else if (modificator == Modificator::CUBE)
step = std::make_unique<CubeStep>(query_plan.getCurrentDataStream(), std::move(params), final);
step = std::make_unique<CubeStep>(query_plan.getCurrentDataStream(), std::move(params), final, settings.group_by_use_nulls);
query_plan.addStep(std::move(step));
}

View File

@ -73,7 +73,6 @@ void IOutputFormat::work()
setRowsBeforeLimit(rows_before_limit_counter->get());
finalize();
finalized = true;
return;
}
@ -120,9 +119,12 @@ void IOutputFormat::write(const Block & block)
void IOutputFormat::finalize()
{
if (finalized)
return;
writePrefixIfNot();
writeSuffixIfNot();
finalizeImpl();
finalized = true;
}
}

View File

@ -11,6 +11,7 @@
#include <Processors/Merges/AggregatingSortedTransform.h>
#include <Processors/Merges/FinishAggregatingInOrderTransform.h>
#include <Interpreters/Aggregator.h>
#include <Functions/FunctionFactory.h>
#include <Processors/QueryPlan/IQueryPlanStep.h>
#include <Columns/ColumnFixedString.h>
#include <DataTypes/DataTypesNumber.h>
@ -46,22 +47,32 @@ Block appendGroupingSetColumn(Block header)
return res;
}
static Block appendGroupingColumn(Block block, const GroupingSetsParamsList & params)
static inline void convertToNullable(Block & header, const Names & keys)
{
for (const auto & key : keys)
{
auto & column = header.getByName(key);
column.type = makeNullableSafe(column.type);
column.column = makeNullableSafe(column.column);
}
}
Block generateOutputHeader(const Block & input_header, const Names & keys, bool use_nulls)
{
auto header = appendGroupingSetColumn(input_header);
if (use_nulls)
convertToNullable(header, keys);
return header;
}
static Block appendGroupingColumn(Block block, const Names & keys, const GroupingSetsParamsList & params, bool use_nulls)
{
if (params.empty())
return block;
Block res;
size_t rows = block.rows();
auto column = ColumnUInt64::create(rows);
res.insert({ColumnPtr(std::move(column)), std::make_shared<DataTypeUInt64>(), "__grouping_set"});
for (auto & col : block)
res.insert(std::move(col));
return res;
return generateOutputHeader(block, keys, use_nulls);
}
AggregatingStep::AggregatingStep(
@ -74,11 +85,12 @@ AggregatingStep::AggregatingStep(
size_t merge_threads_,
size_t temporary_data_merge_threads_,
bool storage_has_evenly_distributed_read_,
bool group_by_use_nulls_,
InputOrderInfoPtr group_by_info_,
SortDescription group_by_sort_description_,
bool should_produce_results_in_order_of_bucket_number_)
: ITransformingStep(
input_stream_, appendGroupingColumn(params_.getHeader(input_stream_.header, final_), grouping_sets_params_), getTraits(should_produce_results_in_order_of_bucket_number_), false)
input_stream_, appendGroupingColumn(params_.getHeader(input_stream_.header, final_), params_.keys, grouping_sets_params_, group_by_use_nulls_), getTraits(should_produce_results_in_order_of_bucket_number_), false)
, params(std::move(params_))
, grouping_sets_params(std::move(grouping_sets_params_))
, final(final_)
@ -87,6 +99,7 @@ AggregatingStep::AggregatingStep(
, merge_threads(merge_threads_)
, temporary_data_merge_threads(temporary_data_merge_threads_)
, storage_has_evenly_distributed_read(storage_has_evenly_distributed_read_)
, group_by_use_nulls(group_by_use_nulls_)
, group_by_info(std::move(group_by_info_))
, group_by_sort_description(std::move(group_by_sort_description_))
, should_produce_results_in_order_of_bucket_number(should_produce_results_in_order_of_bucket_number_)
@ -217,6 +230,8 @@ void AggregatingStep::transformPipeline(QueryPipelineBuilder & pipeline, const B
assert(ports.size() == grouping_sets_size);
auto output_header = transform_params->getHeader();
if (group_by_use_nulls)
convertToNullable(output_header, params.keys);
for (size_t set_counter = 0; set_counter < grouping_sets_size; ++set_counter)
{
@ -236,6 +251,7 @@ void AggregatingStep::transformPipeline(QueryPipelineBuilder & pipeline, const B
const auto & missing_columns = grouping_sets_params[set_counter].missing_keys;
auto to_nullable_function = FunctionFactory::instance().get("toNullable", nullptr);
for (size_t i = 0; i < output_header.columns(); ++i)
{
auto & col = output_header.getByPosition(i);
@ -251,7 +267,13 @@ void AggregatingStep::transformPipeline(QueryPipelineBuilder & pipeline, const B
index.push_back(node);
}
else
index.push_back(dag->getIndex()[header.getPositionByName(col.name)]);
{
const auto * column_node = dag->getIndex()[header.getPositionByName(col.name)];
if (group_by_use_nulls && column_node->result_type->canBeInsideNullable())
index.push_back(&dag->addFunction(to_nullable_function, { column_node }, col.name));
else
index.push_back(column_node);
}
}
dag->getIndex().swap(index);
@ -396,7 +418,7 @@ void AggregatingStep::updateOutputStream()
{
output_stream = createOutputStream(
input_streams.front(),
appendGroupingColumn(params.getHeader(input_streams.front().header, final), grouping_sets_params),
appendGroupingColumn(params.getHeader(input_streams.front().header, final), params.keys, grouping_sets_params, group_by_use_nulls),
getDataStreamTraits());
}

View File

@ -20,6 +20,7 @@ struct GroupingSetsParams
using GroupingSetsParamsList = std::vector<GroupingSetsParams>;
Block appendGroupingSetColumn(Block header);
Block generateOutputHeader(const Block & input_header, const Names & keys, bool use_nulls);
/// Aggregation. See AggregatingTransform.
class AggregatingStep : public ITransformingStep
@ -35,6 +36,7 @@ public:
size_t merge_threads_,
size_t temporary_data_merge_threads_,
bool storage_has_evenly_distributed_read_,
bool group_by_use_nulls_,
InputOrderInfoPtr group_by_info_,
SortDescription group_by_sort_description_,
bool should_produce_results_in_order_of_bucket_number_);
@ -62,6 +64,7 @@ private:
size_t temporary_data_merge_threads;
bool storage_has_evenly_distributed_read;
bool group_by_use_nulls;
InputOrderInfoPtr group_by_info;
SortDescription group_by_sort_description;

View File

@ -4,6 +4,7 @@
#include <Processors/QueryPlan/AggregatingStep.h>
#include <QueryPipeline/QueryPipelineBuilder.h>
#include <DataTypes/DataTypesNumber.h>
#include <Functions/FunctionFactory.h>
namespace DB
{
@ -24,27 +25,41 @@ static ITransformingStep::Traits getTraits()
};
}
CubeStep::CubeStep(const DataStream & input_stream_, Aggregator::Params params_, bool final_)
: ITransformingStep(input_stream_, appendGroupingSetColumn(params_.getHeader(input_stream_.header, final_)), getTraits())
CubeStep::CubeStep(const DataStream & input_stream_, Aggregator::Params params_, bool final_, bool use_nulls_)
: ITransformingStep(input_stream_, generateOutputHeader(params_.getHeader(input_stream_.header, final_), params_.keys, use_nulls_), getTraits())
, keys_size(params_.keys_size)
, params(std::move(params_))
, final(final_)
, use_nulls(use_nulls_)
{
/// Aggregation keys are distinct
for (const auto & key : params.keys)
output_stream->distinct_columns.insert(key);
}
ProcessorPtr addGroupingSetForTotals(const Block & header, const BuildQueryPipelineSettings & settings, UInt64 grouping_set_number)
ProcessorPtr addGroupingSetForTotals(const Block & header, const Names & keys, bool use_nulls, const BuildQueryPipelineSettings & settings, UInt64 grouping_set_number)
{
auto dag = std::make_shared<ActionsDAG>(header.getColumnsWithTypeAndName());
auto & index = dag->getIndex();
if (use_nulls)
{
auto to_nullable = FunctionFactory::instance().get("toNullable", nullptr);
for (const auto & key : keys)
{
const auto * node = dag->getIndex()[header.getPositionByName(key)];
if (node->result_type->canBeInsideNullable())
{
dag->addOrReplaceInIndex(dag->addFunction(to_nullable, { node }, node->result_name));
}
}
}
auto grouping_col = ColumnUInt64::create(1, grouping_set_number);
const auto * grouping_node = &dag->addColumn(
{ColumnPtr(std::move(grouping_col)), std::make_shared<DataTypeUInt64>(), "__grouping_set"});
grouping_node = &dag->materializeNode(*grouping_node);
auto & index = dag->getIndex();
index.insert(index.begin(), grouping_node);
auto expression = std::make_shared<ExpressionActions>(dag, settings.getActionsSettings());
@ -58,10 +73,10 @@ void CubeStep::transformPipeline(QueryPipelineBuilder & pipeline, const BuildQue
pipeline.addSimpleTransform([&](const Block & header, QueryPipelineBuilder::StreamType stream_type) -> ProcessorPtr
{
if (stream_type == QueryPipelineBuilder::StreamType::Totals)
return addGroupingSetForTotals(header, settings, (UInt64(1) << keys_size) - 1);
return addGroupingSetForTotals(header, params.keys, use_nulls, settings, (UInt64(1) << keys_size) - 1);
auto transform_params = std::make_shared<AggregatingTransformParams>(header, std::move(params), final);
return std::make_shared<CubeTransform>(header, std::move(transform_params));
return std::make_shared<CubeTransform>(header, std::move(transform_params), use_nulls);
});
}
@ -73,7 +88,7 @@ const Aggregator::Params & CubeStep::getParams() const
void CubeStep::updateOutputStream()
{
output_stream = createOutputStream(
input_streams.front(), appendGroupingSetColumn(params.getHeader(input_streams.front().header, final)), getDataStreamTraits());
input_streams.front(), generateOutputHeader(params.getHeader(input_streams.front().header, final), params.keys, use_nulls), getDataStreamTraits());
/// Aggregation keys are distinct
for (const auto & key : params.keys)

View File

@ -13,7 +13,7 @@ using AggregatingTransformParamsPtr = std::shared_ptr<AggregatingTransformParams
class CubeStep : public ITransformingStep
{
public:
CubeStep(const DataStream & input_stream_, Aggregator::Params params_, bool final_);
CubeStep(const DataStream & input_stream_, Aggregator::Params params_, bool final_, bool use_nulls_);
String getName() const override { return "Cube"; }
@ -26,6 +26,7 @@ private:
size_t keys_size;
Aggregator::Params params;
bool final;
bool use_nulls;
};
}

View File

@ -22,18 +22,19 @@ static ITransformingStep::Traits getTraits()
};
}
RollupStep::RollupStep(const DataStream & input_stream_, Aggregator::Params params_, bool final_)
: ITransformingStep(input_stream_, appendGroupingSetColumn(params_.getHeader(input_stream_.header, final_)), getTraits())
RollupStep::RollupStep(const DataStream & input_stream_, Aggregator::Params params_, bool final_, bool use_nulls_)
: ITransformingStep(input_stream_, generateOutputHeader(params_.getHeader(input_stream_.header, final_), params_.keys, use_nulls_), getTraits())
, params(std::move(params_))
, keys_size(params.keys_size)
, final(final_)
, use_nulls(use_nulls_)
{
/// Aggregation keys are distinct
for (const auto & key : params.keys)
output_stream->distinct_columns.insert(key);
}
ProcessorPtr addGroupingSetForTotals(const Block & header, const BuildQueryPipelineSettings & settings, UInt64 grouping_set_number);
ProcessorPtr addGroupingSetForTotals(const Block & header, const Names & keys, bool use_nulls, const BuildQueryPipelineSettings & settings, UInt64 grouping_set_number);
void RollupStep::transformPipeline(QueryPipelineBuilder & pipeline, const BuildQueryPipelineSettings & settings)
{
@ -42,10 +43,10 @@ void RollupStep::transformPipeline(QueryPipelineBuilder & pipeline, const BuildQ
pipeline.addSimpleTransform([&](const Block & header, QueryPipelineBuilder::StreamType stream_type) -> ProcessorPtr
{
if (stream_type == QueryPipelineBuilder::StreamType::Totals)
return addGroupingSetForTotals(header, settings, keys_size);
return addGroupingSetForTotals(header, params.keys, use_nulls, settings, keys_size);
auto transform_params = std::make_shared<AggregatingTransformParams>(header, std::move(params), true);
return std::make_shared<RollupTransform>(header, std::move(transform_params));
return std::make_shared<RollupTransform>(header, std::move(transform_params), use_nulls);
});
}

View File

@ -13,7 +13,7 @@ using AggregatingTransformParamsPtr = std::shared_ptr<AggregatingTransformParams
class RollupStep : public ITransformingStep
{
public:
RollupStep(const DataStream & input_stream_, Aggregator::Params params_, bool final_);
RollupStep(const DataStream & input_stream_, Aggregator::Params params_, bool final_, bool use_nulls_);
String getName() const override { return "Rollup"; }
@ -25,6 +25,7 @@ private:
Aggregator::Params params;
size_t keys_size;
bool final;
bool use_nulls;
};
}

View File

@ -1,6 +1,7 @@
#include <Processors/Transforms/CubeTransform.h>
#include <Processors/Transforms/TotalsHavingTransform.h>
#include <Processors/QueryPlan/AggregatingStep.h>
#include "Processors/Transforms/RollupTransform.h"
namespace DB
{
@ -9,61 +10,32 @@ namespace ErrorCodes
extern const int LOGICAL_ERROR;
}
CubeTransform::CubeTransform(Block header, AggregatingTransformParamsPtr params_)
: IAccumulatingTransform(std::move(header), appendGroupingSetColumn(params_->getHeader()))
, params(std::move(params_))
CubeTransform::CubeTransform(Block header, AggregatingTransformParamsPtr params_, bool use_nulls_)
: GroupByModifierTransform(std::move(header), params_, use_nulls_)
, aggregates_mask(getAggregatesMask(params->getHeader(), params->params.aggregates))
{
keys.reserve(params->params.keys_size);
for (const auto & key : params->params.keys)
keys.emplace_back(input.getHeader().getPositionByName(key));
if (keys.size() >= 8 * sizeof(mask))
throw Exception("Too many keys are used for CubeTransform.", ErrorCodes::LOGICAL_ERROR);
}
Chunk CubeTransform::merge(Chunks && chunks, bool final)
{
BlocksList rollup_blocks;
for (auto & chunk : chunks)
rollup_blocks.emplace_back(getInputPort().getHeader().cloneWithColumns(chunk.detachColumns()));
auto rollup_block = params->aggregator.mergeBlocks(rollup_blocks, final);
auto num_rows = rollup_block.rows();
return Chunk(rollup_block.getColumns(), num_rows);
}
void CubeTransform::consume(Chunk chunk)
{
consumed_chunks.emplace_back(std::move(chunk));
}
MutableColumnPtr getColumnWithDefaults(Block const & header, size_t key, size_t n);
Chunk CubeTransform::generate()
{
if (!consumed_chunks.empty())
{
if (consumed_chunks.size() > 1)
cube_chunk = merge(std::move(consumed_chunks), false);
else
cube_chunk = std::move(consumed_chunks.front());
mergeConsumed();
consumed_chunks.clear();
auto num_rows = cube_chunk.getNumRows();
auto num_rows = current_chunk.getNumRows();
mask = (static_cast<UInt64>(1) << keys.size()) - 1;
current_columns = cube_chunk.getColumns();
current_columns = current_chunk.getColumns();
current_zero_columns.clear();
current_zero_columns.reserve(keys.size());
auto const & input_header = getInputPort().getHeader();
for (auto key : keys)
current_zero_columns.emplace_back(getColumnWithDefaults(input_header, key, num_rows));
current_zero_columns.emplace_back(getColumnWithDefaults(key, num_rows));
}
auto gen_chunk = std::move(cube_chunk);
auto gen_chunk = std::move(current_chunk);
if (mask)
{
@ -78,7 +50,7 @@ Chunk CubeTransform::generate()
Chunks chunks;
chunks.emplace_back(std::move(columns), current_columns.front()->size());
cube_chunk = merge(std::move(chunks), false);
current_chunk = merge(std::move(chunks), !use_nulls, false);
}
finalizeChunk(gen_chunk, aggregates_mask);

View File

@ -1,6 +1,7 @@
#pragma once
#include <Processors/IInflatingTransform.h>
#include <Processors/Transforms/AggregatingTransform.h>
#include <Processors/Transforms/RollupTransform.h>
#include <Processors/Transforms/finalizeChunk.h>
@ -9,30 +10,23 @@ namespace DB
/// Takes blocks after grouping, with non-finalized aggregate functions.
/// Calculates all subsets of columns and aggregates over them.
class CubeTransform : public IAccumulatingTransform
class CubeTransform : public GroupByModifierTransform
{
public:
CubeTransform(Block header, AggregatingTransformParamsPtr params);
CubeTransform(Block header, AggregatingTransformParamsPtr params, bool use_nulls_);
String getName() const override { return "CubeTransform"; }
protected:
void consume(Chunk chunk) override;
Chunk generate() override;
private:
AggregatingTransformParamsPtr params;
ColumnNumbers keys;
const ColumnsMask aggregates_mask;
Chunks consumed_chunks;
Chunk cube_chunk;
Columns current_columns;
Columns current_zero_columns;
UInt64 mask = 0;
UInt64 grouping_set = 0;
Chunk merge(Chunks && chunks, bool final);
};
}

View File

@ -1,36 +1,80 @@
#include <Processors/Transforms/RollupTransform.h>
#include <Processors/Transforms/TotalsHavingTransform.h>
#include <Processors/QueryPlan/AggregatingStep.h>
#include <Columns/ColumnNullable.h>
namespace DB
{
RollupTransform::RollupTransform(Block header, AggregatingTransformParamsPtr params_)
: IAccumulatingTransform(std::move(header), appendGroupingSetColumn(params_->getHeader()))
GroupByModifierTransform::GroupByModifierTransform(Block header, AggregatingTransformParamsPtr params_, bool use_nulls_)
: IAccumulatingTransform(std::move(header), generateOutputHeader(params_->getHeader(), params_->params.keys, use_nulls_))
, params(std::move(params_))
, aggregates_mask(getAggregatesMask(params->getHeader(), params->params.aggregates))
, use_nulls(use_nulls_)
{
keys.reserve(params->params.keys_size);
for (const auto & key : params->params.keys)
keys.emplace_back(input.getHeader().getPositionByName(key));
intermediate_header = getOutputPort().getHeader();
intermediate_header.erase(0);
if (use_nulls)
{
auto output_aggregator_params = params->params;
output_aggregator = std::make_unique<Aggregator>(intermediate_header, output_aggregator_params);
}
}
void RollupTransform::consume(Chunk chunk)
void GroupByModifierTransform::consume(Chunk chunk)
{
consumed_chunks.emplace_back(std::move(chunk));
}
Chunk RollupTransform::merge(Chunks && chunks, bool final)
void GroupByModifierTransform::mergeConsumed()
{
BlocksList rollup_blocks;
for (auto & chunk : chunks)
rollup_blocks.emplace_back(getInputPort().getHeader().cloneWithColumns(chunk.detachColumns()));
if (consumed_chunks.size() > 1)
current_chunk = merge(std::move(consumed_chunks), true, false);
else
current_chunk = std::move(consumed_chunks.front());
auto rollup_block = params->aggregator.mergeBlocks(rollup_blocks, final);
auto num_rows = rollup_block.rows();
return Chunk(rollup_block.getColumns(), num_rows);
size_t rows = current_chunk.getNumRows();
auto columns = current_chunk.getColumns();
if (use_nulls)
{
for (auto key : keys)
columns[key] = makeNullableSafe(columns[key]);
}
current_chunk = Chunk{ columns, rows };
consumed_chunks.clear();
}
Chunk GroupByModifierTransform::merge(Chunks && chunks, bool is_input, bool final)
{
auto header = is_input ? getInputPort().getHeader() : intermediate_header;
BlocksList blocks;
for (auto & chunk : chunks)
blocks.emplace_back(header.cloneWithColumns(chunk.detachColumns()));
auto current_block = is_input ? params->aggregator.mergeBlocks(blocks, final) : output_aggregator->mergeBlocks(blocks, final);
auto num_rows = current_block.rows();
return Chunk(current_block.getColumns(), num_rows);
}
MutableColumnPtr GroupByModifierTransform::getColumnWithDefaults(size_t key, size_t n) const
{
auto const & col = intermediate_header.getByPosition(key);
auto result_column = col.column->cloneEmpty();
col.type->insertManyDefaultsInto(*result_column, n);
return result_column;
}
RollupTransform::RollupTransform(Block header, AggregatingTransformParamsPtr params_, bool use_nulls_)
: GroupByModifierTransform(std::move(header), params_, use_nulls_)
, aggregates_mask(getAggregatesMask(params->getHeader(), params->params.aggregates))
{}
MutableColumnPtr getColumnWithDefaults(Block const & header, size_t key, size_t n)
{
auto const & col = header.getByPosition(key);
@ -43,16 +87,11 @@ Chunk RollupTransform::generate()
{
if (!consumed_chunks.empty())
{
if (consumed_chunks.size() > 1)
rollup_chunk = merge(std::move(consumed_chunks), false);
else
rollup_chunk = std::move(consumed_chunks.front());
consumed_chunks.clear();
mergeConsumed();
last_removed_key = keys.size();
}
auto gen_chunk = std::move(rollup_chunk);
auto gen_chunk = std::move(current_chunk);
if (last_removed_key)
{
@ -61,11 +100,11 @@ Chunk RollupTransform::generate()
auto num_rows = gen_chunk.getNumRows();
auto columns = gen_chunk.getColumns();
columns[key] = getColumnWithDefaults(getInputPort().getHeader(), key, num_rows);
columns[key] = getColumnWithDefaults(key, num_rows);
Chunks chunks;
chunks.emplace_back(std::move(columns), num_rows);
rollup_chunk = merge(std::move(chunks), false);
current_chunk = merge(std::move(chunks), !use_nulls, false);
}
finalizeChunk(gen_chunk, aggregates_mask);

View File

@ -1,4 +1,6 @@
#pragma once
#include <memory>
#include <Core/ColumnNumbers.h>
#include <Processors/IAccumulatingTransform.h>
#include <Processors/Transforms/AggregatingTransform.h>
#include <Processors/Transforms/finalizeChunk.h>
@ -6,29 +8,49 @@
namespace DB
{
/// Takes blocks after grouping, with non-finalized aggregate functions.
/// Calculates subtotals and grand totals values for a set of columns.
class RollupTransform : public IAccumulatingTransform
struct GroupByModifierTransform : public IAccumulatingTransform
{
public:
RollupTransform(Block header, AggregatingTransformParamsPtr params);
String getName() const override { return "RollupTransform"; }
GroupByModifierTransform(Block header, AggregatingTransformParamsPtr params_, bool use_nulls_);
protected:
void consume(Chunk chunk) override;
void mergeConsumed();
Chunk merge(Chunks && chunks, bool is_input, bool final);
MutableColumnPtr getColumnWithDefaults(size_t key, size_t n) const;
AggregatingTransformParamsPtr params;
bool use_nulls;
ColumnNumbers keys;
std::unique_ptr<Aggregator> output_aggregator;
Block intermediate_header;
Chunks consumed_chunks;
Chunk current_chunk;
};
/// Takes blocks after grouping, with non-finalized aggregate functions.
/// Calculates subtotals and grand totals values for a set of columns.
class RollupTransform : public GroupByModifierTransform
{
public:
RollupTransform(Block header, AggregatingTransformParamsPtr params, bool use_nulls_);
String getName() const override { return "RollupTransform"; }
protected:
Chunk generate() override;
private:
AggregatingTransformParamsPtr params;
ColumnNumbers keys;
const ColumnsMask aggregates_mask;
Chunks consumed_chunks;
Chunk rollup_chunk;
size_t last_removed_key = 0;
size_t set_counter = 0;
Chunk merge(Chunks && chunks, bool final);
};
}

View File

@ -430,6 +430,13 @@ public:
writer->write(getHeader().cloneWithColumns(chunk.detachColumns()));
}
void onCancel() override
{
if (!writer)
return;
onFinish();
}
void onException() override
{
if (!writer)

View File

@ -79,13 +79,23 @@ Range createRangeFromParquetStatistics(std::shared_ptr<parquet::ByteArrayStatist
std::optional<size_t> IHiveFile::getRows()
{
if (!rows)
rows = getRowsImpl();
if (!has_init_rows)
{
std::lock_guard lock(mutex);
if (!has_init_rows)
{
rows = getRowsImpl();
has_init_rows = true;
}
}
return rows;
}
void IHiveFile::loadFileMinMaxIndex()
{
if (file_minmax_idx_loaded)
return;
std::lock_guard lock(mutex);
if (file_minmax_idx_loaded)
return;
loadFileMinMaxIndexImpl();
@ -94,6 +104,9 @@ void IHiveFile::loadFileMinMaxIndex()
void IHiveFile::loadSplitMinMaxIndexes()
{
if (split_minmax_idxes_loaded)
return;
std::lock_guard lock(mutex);
if (split_minmax_idxes_loaded)
return;
loadSplitMinMaxIndexesImpl();

View File

@ -149,6 +149,7 @@ protected:
String path;
UInt64 last_modify_time;
size_t size;
std::atomic<bool> has_init_rows = false;
std::optional<size_t> rows;
NamesAndTypesList index_names_and_types;
@ -162,6 +163,9 @@ protected:
/// Skip splits for this file after applying minmax index (if any)
std::unordered_set<int> skip_splits;
std::shared_ptr<HiveSettings> storage_settings;
/// IHiveFile would be shared among multi threads, need lock's protection to update min/max indexes.
std::mutex mutex;
};
using HiveFilePtr = std::shared_ptr<IHiveFile>;

View File

@ -127,12 +127,13 @@ void Service::processQuery(const HTMLForm & params, ReadBuffer & /*body*/, Write
{
if (part && part->isProjectionPart())
{
data.reportBrokenPart(part->getParentPart()->name);
auto parent_part = part->getParentPart()->shared_from_this();
data.reportBrokenPart(parent_part);
}
else if (part)
data.reportBrokenPart(part);
else
{
data.reportBrokenPart(part_name);
}
LOG_TRACE(log, "Part {} was not found, do not report it as broken", part_name);
};
try

View File

@ -6031,8 +6031,10 @@ void MergeTreeData::reportBrokenPart(MergeTreeData::DataPartPtr & data_part) con
broken_part_callback(part->name);
}
}
else
else if (data_part && data_part->getState() == IMergeTreeDataPart::State::Active)
broken_part_callback(data_part->name);
else
LOG_DEBUG(log, "Will not check potentially broken part {} because it's not active", data_part->getNameWithState());
}
MergeTreeData::MatcherFn MergeTreeData::getPartitionMatcher(const ASTPtr & partition_ast, ContextPtr local_context) const

View File

@ -669,12 +669,7 @@ public:
AlterLockHolder & table_lock_holder);
/// Should be called if part data is suspected to be corrupted.
void reportBrokenPart(const String & name) const
{
broken_part_callback(name);
}
/// Same as above but has the ability to check all other parts
/// Has the ability to check all other parts
/// which reside on the same disk of the suspicious part.
void reportBrokenPart(MergeTreeData::DataPartPtr & data_part) const;

View File

@ -383,6 +383,7 @@ QueryPlanPtr MergeTreeDataSelectExecutor::read(
merge_threads,
temporary_data_merge_threads,
/* storage_has_evenly_distributed_read_= */ false,
/* group_by_use_nulls */ false,
std::move(group_by_info),
std::move(group_by_sort_description),
should_produce_results_in_order_of_bucket_number);

View File

@ -44,7 +44,7 @@ catch (...)
{
/// Suspicion of the broken part. A part is added to the queue for verification.
if (getCurrentExceptionCode() != ErrorCodes::MEMORY_LIMIT_EXCEEDED)
storage.reportBrokenPart(data_part->name);
storage.reportBrokenPart(data_part);
throw;
}

View File

@ -2336,6 +2336,12 @@ bool ReplicatedMergeTreeMergePredicate::hasDropRange(const MergeTreePartInfo & n
return queue.hasDropRange(new_drop_range_info);
}
String ReplicatedMergeTreeMergePredicate::getCoveringVirtualPart(const String & part_name) const
{
std::lock_guard<std::mutex> lock(queue.state_mutex);
return queue.virtual_parts.getContainingPart(MergeTreePartInfo::fromPartName(part_name, queue.format_version));
}
ReplicatedMergeTreeQueue::SubscriberHandler
ReplicatedMergeTreeQueue::addSubscriber(ReplicatedMergeTreeQueue::SubscriberCallBack && callback)

View File

@ -519,8 +519,12 @@ public:
/// The version of "log" node that is used to check that no new merges have appeared.
int32_t getVersion() const { return merges_version; }
/// Returns true if there's a drop range covering new_drop_range_info
bool hasDropRange(const MergeTreePartInfo & new_drop_range_info) const;
/// Returns virtual part covering part_name (if any) or empty string
String getCoveringVirtualPart(const String & part_name) const;
private:
const ReplicatedMergeTreeQueue & queue;

View File

@ -813,6 +813,13 @@ public:
writer->write(getHeader().cloneWithColumns(chunk.detachColumns()));
}
void onCancel() override
{
if (!writer)
return;
onFinish();
}
void onException() override
{
if (!writer)

View File

@ -1837,8 +1837,8 @@ void StorageReplicatedMergeTree::executeDropRange(const LogEntry & entry)
LOG_TRACE(log, "Executing DROP_RANGE {}", entry.new_part_name);
auto drop_range_info = MergeTreePartInfo::fromPartName(entry.new_part_name, format_version);
getContext()->getMergeList().cancelInPartition(getStorageID(), drop_range_info.partition_id, drop_range_info.max_block);
part_check_thread.cancelRemovedPartsCheck(drop_range_info);
queue.removePartProducingOpsInRange(getZooKeeper(), drop_range_info, entry);
part_check_thread.cancelRemovedPartsCheck(drop_range_info);
/// Delete the parts contained in the range to be deleted.
/// It's important that no old parts remain (after the merge), because otherwise,
@ -1906,8 +1906,8 @@ bool StorageReplicatedMergeTree::executeReplaceRange(const LogEntry & entry)
if (replace)
{
getContext()->getMergeList().cancelInPartition(getStorageID(), drop_range.partition_id, drop_range.max_block);
part_check_thread.cancelRemovedPartsCheck(drop_range);
queue.removePartProducingOpsInRange(getZooKeeper(), drop_range, entry);
part_check_thread.cancelRemovedPartsCheck(drop_range);
}
else
{
@ -7953,12 +7953,31 @@ bool StorageReplicatedMergeTree::createEmptyPartInsteadOfLost(zkutil::ZooKeeperP
while (true)
{
/// We should be careful when creating an empty part, because we are not sure that this part is still needed.
/// For example, it's possible that part (or partition) was dropped (or replaced) concurrently.
/// We can enqueue part for check from DataPartExchange or SelectProcessor
/// and it's hard to synchronize it with ReplicatedMergeTreeQueue and PartCheckThread...
/// But at least we can ignore parts that are definitely not needed according to virtual parts and drop ranges.
auto pred = queue.getMergePredicate(zookeeper);
String covering_virtual = pred.getCoveringVirtualPart(lost_part_name);
if (covering_virtual.empty())
{
LOG_WARNING(log, "Will not create empty part instead of lost {}, because there's no covering part in replication queue", lost_part_name);
return false;
}
if (pred.hasDropRange(MergeTreePartInfo::fromPartName(covering_virtual, format_version)))
{
LOG_WARNING(log, "Will not create empty part instead of lost {}, because it's covered by DROP_RANGE", lost_part_name);
return false;
}
Coordination::Requests ops;
Coordination::Stat replicas_stat;
auto replicas_path = fs::path(zookeeper_path) / "replicas";
Strings replicas = zookeeper->getChildren(replicas_path, &replicas_stat);
ops.emplace_back(zkutil::makeCheckRequest(zookeeper_path + "/log", pred.getVersion()));
/// In rare cases new replica can appear during check
ops.emplace_back(zkutil::makeCheckRequest(replicas_path, replicas_stat.version));
@ -7988,7 +8007,7 @@ bool StorageReplicatedMergeTree::createEmptyPartInsteadOfLost(zkutil::ZooKeeperP
}
else if (code == Coordination::Error::ZBADVERSION)
{
LOG_INFO(log, "Looks like new replica appearead while creating new empty part, will retry");
LOG_INFO(log, "Looks like log was updated or new replica appeared while creating new empty part, will retry");
}
else
{

View File

@ -602,6 +602,13 @@ public:
writer->write(getHeader().cloneWithColumns(chunk.detachColumns()));
}
void onCancel() override
{
if (!writer)
return;
onFinish();
}
void onException() override
{
if (!writer)

View File

@ -450,6 +450,13 @@ void StorageURLSink::consume(Chunk chunk)
writer->write(getHeader().cloneWithColumns(chunk.detachColumns()));
}
void StorageURLSink::onCancel()
{
if (!writer)
return;
onFinish();
}
void StorageURLSink::onException()
{
if (!writer)

View File

@ -114,6 +114,7 @@ public:
std::string getName() const override { return "StorageURLSink"; }
void consume(Chunk chunk) override;
void onCancel() override;
void onException() override;
void onFinish() override;

View File

@ -1,5 +1,6 @@
SET enable_optimize_predicate_expression = 0;
SET convert_query_to_cnf = 0;
SET cross_to_inner_join_rewrite = 1;
DROP TABLE IF EXISTS t1;
DROP TABLE IF EXISTS t2;

View File

@ -1,5 +1,7 @@
DROP TABLE IF EXISTS codecTest;
SET cross_to_inner_join_rewrite = 1;
CREATE TABLE codecTest (
key UInt64,
name String,

View File

@ -7,6 +7,8 @@ DROP TABLE IF EXISTS lineitem;
DROP TABLE IF EXISTS nation;
DROP TABLE IF EXISTS region;
SET cross_to_inner_join_rewrite = 1;
CREATE TABLE part
(
p_partkey Int32, -- PK

View File

@ -1,3 +1,5 @@
SET cross_to_inner_join_rewrite = 1;
SELECT count()
FROM numbers(4) AS n1, numbers(3) AS n2
WHERE n1.number > (select avg(n.number) from numbers(3) n);

View File

@ -1,6 +1,8 @@
-- This test case is almost completely generated by fuzzer.
-- It appeared to trigger assertion.
SET cross_to_inner_join_rewrite = 1;
DROP TABLE IF EXISTS codecTest;
CREATE TABLE codecTest (

View File

@ -1,5 +1,7 @@
DROP TABLE IF EXISTS codecTest;
SET cross_to_inner_join_rewrite = 1;
CREATE TABLE codecTest (
key UInt64,
name String,

View File

@ -0,0 +1,215 @@
-- { echoOn }
SELECT number, number % 2, sum(number) AS val
FROM numbers(10)
GROUP BY ROLLUP(number, number % 2)
ORDER BY (number, number % 2, val)
SETTINGS group_by_use_nulls=1;
0 0 0
0 \N 0
1 1 1
1 \N 1
2 0 2
2 \N 2
3 1 3
3 \N 3
4 0 4
4 \N 4
5 1 5
5 \N 5
6 0 6
6 \N 6
7 1 7
7 \N 7
8 0 8
8 \N 8
9 1 9
9 \N 9
\N \N 45
SELECT number, number % 2, sum(number) AS val
FROM numbers(10)
GROUP BY ROLLUP(number, number % 2)
ORDER BY (number, number % 2, val)
SETTINGS group_by_use_nulls=0;
0 0 0
0 0 0
0 0 45
1 0 1
1 1 1
2 0 2
2 0 2
3 0 3
3 1 3
4 0 4
4 0 4
5 0 5
5 1 5
6 0 6
6 0 6
7 0 7
7 1 7
8 0 8
8 0 8
9 0 9
9 1 9
SELECT number, number % 2, sum(number) AS val
FROM numbers(10)
GROUP BY CUBE(number, number % 2)
ORDER BY (number, number % 2, val)
SETTINGS group_by_use_nulls=1;
0 0 0
0 \N 0
1 1 1
1 \N 1
2 0 2
2 \N 2
3 1 3
3 \N 3
4 0 4
4 \N 4
5 1 5
5 \N 5
6 0 6
6 \N 6
7 1 7
7 \N 7
8 0 8
8 \N 8
9 1 9
9 \N 9
\N 0 20
\N 1 25
\N \N 45
SELECT number, number % 2, sum(number) AS val
FROM numbers(10)
GROUP BY CUBE(number, number % 2)
ORDER BY (number, number % 2, val)
SETTINGS group_by_use_nulls=0;
0 0 0
0 0 0
0 0 20
0 0 45
0 1 25
1 0 1
1 1 1
2 0 2
2 0 2
3 0 3
3 1 3
4 0 4
4 0 4
5 0 5
5 1 5
6 0 6
6 0 6
7 0 7
7 1 7
8 0 8
8 0 8
9 0 9
9 1 9
SELECT
number,
number % 2,
sum(number) AS val
FROM numbers(10)
GROUP BY
GROUPING SETS (
(number),
(number % 2)
)
ORDER BY (number, number % 2, val)
SETTINGS group_by_use_nulls = 1;
0 \N 0
1 \N 1
2 \N 2
3 \N 3
4 \N 4
5 \N 5
6 \N 6
7 \N 7
8 \N 8
9 \N 9
\N 0 20
\N 1 25
SELECT
number,
number % 2,
sum(number) AS val
FROM numbers(10)
GROUP BY
GROUPING SETS (
(number),
(number % 2)
)
ORDER BY (number, number % 2, val)
SETTINGS group_by_use_nulls = 0;
0 0 0
0 0 20
0 1 25
1 0 1
2 0 2
3 0 3
4 0 4
5 0 5
6 0 6
7 0 7
8 0 8
9 0 9
SELECT number, number % 2, sum(number) AS val
FROM numbers(10)
GROUP BY ROLLUP(number, number % 2) WITH TOTALS
ORDER BY (number, number % 2, val)
SETTINGS group_by_use_nulls=1;
0 0 0
0 \N 0
1 1 1
1 \N 1
2 0 2
2 \N 2
3 1 3
3 \N 3
4 0 4
4 \N 4
5 1 5
5 \N 5
6 0 6
6 \N 6
7 1 7
7 \N 7
8 0 8
8 \N 8
9 1 9
9 \N 9
\N \N 45
0 0 45
SELECT number, number % 2, sum(number) AS val
FROM numbers(10)
GROUP BY CUBE(number, number % 2) WITH TOTALS
ORDER BY (number, number % 2, val)
SETTINGS group_by_use_nulls=1;
0 0 0
0 \N 0
1 1 1
1 \N 1
2 0 2
2 \N 2
3 1 3
3 \N 3
4 0 4
4 \N 4
5 1 5
5 \N 5
6 0 6
6 \N 6
7 1 7
7 \N 7
8 0 8
8 \N 8
9 1 9
9 \N 9
\N 0 20
\N 1 25
\N \N 45
0 0 45

View File

@ -0,0 +1,62 @@
-- { echoOn }
SELECT number, number % 2, sum(number) AS val
FROM numbers(10)
GROUP BY ROLLUP(number, number % 2)
ORDER BY (number, number % 2, val)
SETTINGS group_by_use_nulls=1;
SELECT number, number % 2, sum(number) AS val
FROM numbers(10)
GROUP BY ROLLUP(number, number % 2)
ORDER BY (number, number % 2, val)
SETTINGS group_by_use_nulls=0;
SELECT number, number % 2, sum(number) AS val
FROM numbers(10)
GROUP BY CUBE(number, number % 2)
ORDER BY (number, number % 2, val)
SETTINGS group_by_use_nulls=1;
SELECT number, number % 2, sum(number) AS val
FROM numbers(10)
GROUP BY CUBE(number, number % 2)
ORDER BY (number, number % 2, val)
SETTINGS group_by_use_nulls=0;
SELECT
number,
number % 2,
sum(number) AS val
FROM numbers(10)
GROUP BY
GROUPING SETS (
(number),
(number % 2)
)
ORDER BY (number, number % 2, val)
SETTINGS group_by_use_nulls = 1;
SELECT
number,
number % 2,
sum(number) AS val
FROM numbers(10)
GROUP BY
GROUPING SETS (
(number),
(number % 2)
)
ORDER BY (number, number % 2, val)
SETTINGS group_by_use_nulls = 0;
SELECT number, number % 2, sum(number) AS val
FROM numbers(10)
GROUP BY ROLLUP(number, number % 2) WITH TOTALS
ORDER BY (number, number % 2, val)
SETTINGS group_by_use_nulls=1;
SELECT number, number % 2, sum(number) AS val
FROM numbers(10)
GROUP BY CUBE(number, number % 2) WITH TOTALS
ORDER BY (number, number % 2, val)
SETTINGS group_by_use_nulls=1;

View File

@ -0,0 +1,157 @@
-- { echoOn }
SELECT number, number % 2, sum(number) AS val
FROM remote('127.0.0.{2,3}', numbers(10))
GROUP BY ROLLUP(number, number % 2)
ORDER BY (number, number % 2, val)
SETTINGS group_by_use_nulls=1;
0 0 0
0 \N 0
1 1 2
1 \N 2
2 0 4
2 \N 4
3 1 6
3 \N 6
4 0 8
4 \N 8
5 1 10
5 \N 10
6 0 12
6 \N 12
7 1 14
7 \N 14
8 0 16
8 \N 16
9 1 18
9 \N 18
\N \N 90
SELECT number, number % 2, sum(number) AS val
FROM remote('127.0.0.{2,3}', numbers(10))
GROUP BY ROLLUP(number, number % 2)
ORDER BY (number, number % 2, val)
SETTINGS group_by_use_nulls=0;
0 0 0
0 0 0
0 0 90
1 0 2
1 1 2
2 0 4
2 0 4
3 0 6
3 1 6
4 0 8
4 0 8
5 0 10
5 1 10
6 0 12
6 0 12
7 0 14
7 1 14
8 0 16
8 0 16
9 0 18
9 1 18
SELECT number, number % 2, sum(number) AS val
FROM remote('127.0.0.{2,3}', numbers(10))
GROUP BY CUBE(number, number % 2)
ORDER BY (number, number % 2, val)
SETTINGS group_by_use_nulls=1;
0 0 0
0 \N 0
1 1 2
1 \N 2
2 0 4
2 \N 4
3 1 6
3 \N 6
4 0 8
4 \N 8
5 1 10
5 \N 10
6 0 12
6 \N 12
7 1 14
7 \N 14
8 0 16
8 \N 16
9 1 18
9 \N 18
\N 0 40
\N 1 50
\N \N 90
SELECT number, number % 2, sum(number) AS val
FROM remote('127.0.0.{2,3}', numbers(10))
GROUP BY CUBE(number, number % 2)
ORDER BY (number, number % 2, val)
SETTINGS group_by_use_nulls=0;
0 0 0
0 0 0
0 0 40
0 0 90
0 1 50
1 0 2
1 1 2
2 0 4
2 0 4
3 0 6
3 1 6
4 0 8
4 0 8
5 0 10
5 1 10
6 0 12
6 0 12
7 0 14
7 1 14
8 0 16
8 0 16
9 0 18
9 1 18
SELECT
number,
number % 2,
sum(number) AS val
FROM remote('127.0.0.{2,3}', numbers(10))
GROUP BY
GROUPING SETS (
(number),
(number % 2)
)
ORDER BY (number, number % 2, val)
SETTINGS group_by_use_nulls = 1;
0 \N 0
1 \N 2
2 \N 4
3 \N 6
4 \N 8
5 \N 10
6 \N 12
7 \N 14
8 \N 16
9 \N 18
\N 0 40
\N 1 50
SELECT
number,
number % 2,
sum(number) AS val
FROM remote('127.0.0.{2,3}', numbers(10))
GROUP BY
GROUPING SETS (
(number),
(number % 2)
)
ORDER BY (number, number % 2, val)
SETTINGS group_by_use_nulls = 0;
0 0 0
0 0 40
0 1 50
1 0 2
2 0 4
3 0 6
4 0 8
5 0 10
6 0 12
7 0 14
8 0 16
9 0 18

View File

@ -0,0 +1,51 @@
-- { echoOn }
SELECT number, number % 2, sum(number) AS val
FROM remote('127.0.0.{2,3}', numbers(10))
GROUP BY ROLLUP(number, number % 2)
ORDER BY (number, number % 2, val)
SETTINGS group_by_use_nulls=1;
SELECT number, number % 2, sum(number) AS val
FROM remote('127.0.0.{2,3}', numbers(10))
GROUP BY ROLLUP(number, number % 2)
ORDER BY (number, number % 2, val)
SETTINGS group_by_use_nulls=0;
SELECT number, number % 2, sum(number) AS val
FROM remote('127.0.0.{2,3}', numbers(10))
GROUP BY CUBE(number, number % 2)
ORDER BY (number, number % 2, val)
SETTINGS group_by_use_nulls=1;
SELECT number, number % 2, sum(number) AS val
FROM remote('127.0.0.{2,3}', numbers(10))
GROUP BY CUBE(number, number % 2)
ORDER BY (number, number % 2, val)
SETTINGS group_by_use_nulls=0;
SELECT
number,
number % 2,
sum(number) AS val
FROM remote('127.0.0.{2,3}', numbers(10))
GROUP BY
GROUPING SETS (
(number),
(number % 2)
)
ORDER BY (number, number % 2, val)
SETTINGS group_by_use_nulls = 1;
SELECT
number,
number % 2,
sum(number) AS val
FROM remote('127.0.0.{2,3}', numbers(10))
GROUP BY
GROUPING SETS (
(number),
(number % 2)
)
ORDER BY (number, number % 2, val)
SETTINGS group_by_use_nulls = 0;

View File

@ -0,0 +1,7 @@
1
1
1
1
1
1
1

View File

@ -0,0 +1,22 @@
DROP TABLE IF EXISTS t1;
DROP TABLE IF EXISTS t2;
CREATE TABLE t1 ( x Int ) Engine = Memory;
INSERT INTO t1 VALUES ( 1 ), ( 2 ), ( 3 );
CREATE TABLE t2 ( x Int ) Engine = Memory;
INSERT INTO t2 VALUES ( 2 ), ( 3 ), ( 4 );
SET cross_to_inner_join_rewrite = 1;
SELECT count() = 1 FROM t1, t2 WHERE t1.x > t2.x;
SELECT count() = 2 FROM t1, t2 WHERE t1.x = t2.x;
SELECT count() = 2 FROM t1 CROSS JOIN t2 WHERE t1.x = t2.x;
SELECT count() = 1 FROM t1 CROSS JOIN t2 WHERE t1.x > t2.x;
SET cross_to_inner_join_rewrite = 2;
SELECT count() = 1 FROM t1, t2 WHERE t1.x > t2.x; -- { serverError INCORRECT_QUERY }
SELECT count() = 2 FROM t1, t2 WHERE t1.x = t2.x;
SELECT count() = 2 FROM t1 CROSS JOIN t2 WHERE t1.x = t2.x;
SELECT count() = 1 FROM t1 CROSS JOIN t2 WHERE t1.x > t2.x; -- do not force rewrite explicit CROSS

View File

@ -0,0 +1,10 @@
1704509 1384
732797 1336
598875 1384
792887 1336
3807842 1336
25703952 1336
716829 1384
59183 1336
33010362 1336
800784 1336

View File

@ -0,0 +1,10 @@
SELECT
CounterID AS k,
quantileBFloat16(0.5)(ResolutionWidth)
FROM remote('127.0.0.{1,2}', test, hits)
GROUP BY k
ORDER BY
count() DESC,
CounterID ASC
LIMIT 10
SETTINGS group_by_use_nulls = 1;