Merge branch 'master' of github.com:yandex/ClickHouse

This commit is contained in:
BayoNet 2018-10-23 14:21:21 +03:00
commit 46019e3ffe
74 changed files with 1646 additions and 307 deletions

View File

@ -26,10 +26,20 @@ struct SortColumnDescription
SortColumnDescription(const std::string & column_name_, int direction_, int nulls_direction_, const std::shared_ptr<Collator> & collator_ = nullptr)
: column_name(column_name_), column_number(0), direction(direction_), nulls_direction(nulls_direction_), collator(collator_) {}
bool operator == (const SortColumnDescription & other) const
{
return column_name == other.column_name && column_number == other.column_number
&& direction == other.direction && nulls_direction == other.nulls_direction;
}
bool operator != (const SortColumnDescription & other) const
{
return !(*this == other);
}
};
/// Description of the sorting rule for several columns.
using SortDescription = std::vector<SortColumnDescription>;
}

View File

@ -0,0 +1,162 @@
#include <DataStreams/FinishSortingBlockInputStream.h>
#include <DataStreams/MergeSortingBlockInputStream.h>
#include <DataStreams/processConstants.h>
namespace DB
{
namespace ErrorCodes
{
extern const int LOGICAL_ERROR;
}
static bool isPrefix(const SortDescription & pref_descr, const SortDescription & descr)
{
if (pref_descr.size() > descr.size())
return false;
for (size_t i = 0; i < pref_descr.size(); ++i)
if (pref_descr[i] != descr[i])
return false;
return true;
}
FinishSortingBlockInputStream::FinishSortingBlockInputStream(
const BlockInputStreamPtr & input, const SortDescription & description_sorted_,
const SortDescription & description_to_sort_,
size_t max_merged_block_size_, size_t limit_)
: description_sorted(description_sorted_), description_to_sort(description_to_sort_),
max_merged_block_size(max_merged_block_size_), limit(limit_)
{
if (!isPrefix(description_sorted, description_to_sort))
throw Exception("Can`t finish sorting. SortDescription of already sorted stream is not prefix of "
"SortDescription needed to sort", ErrorCodes::LOGICAL_ERROR);
children.push_back(input);
header = children.at(0)->getHeader();
removeConstantsFromSortDescription(header, description_to_sort);
}
struct Less
{
const ColumnsWithSortDescriptions & left_columns;
const ColumnsWithSortDescriptions & right_columns;
Less(const ColumnsWithSortDescriptions & left_columns_, const ColumnsWithSortDescriptions & right_columns_) :
left_columns(left_columns_), right_columns(right_columns_) {}
bool operator() (size_t a, size_t b) const
{
for (auto it = left_columns.begin(), jt = right_columns.begin(); it != left_columns.end(); ++it, ++jt)
{
int res = it->second.direction * it->first->compareAt(a, b, *jt->first, it->second.nulls_direction);
if (res < 0)
return true;
else if (res > 0)
return false;
}
return false;
}
};
Block FinishSortingBlockInputStream::readImpl()
{
if (limit && total_rows_processed >= limit)
return {};
Block res;
if (impl)
res = impl->read();
/// If res block is empty, we have finished sorting previous chunk of blocks.
if (!res)
{
if (end_of_stream)
return {};
blocks.clear();
if (tail_block)
blocks.push_back(std::move(tail_block));
while (true)
{
Block block = children.back()->read();
/// End of input stream, but we can`t return immediatly, we need to merge already read blocks.
/// Check it later, when get end of stream from impl.
if (!block)
{
end_of_stream = true;
break;
}
// If there were only const columns in sort description, then there is no need to sort.
// Return the blocks as is.
if (description_to_sort.empty())
return block;
size_t size = block.rows();
if (size == 0)
continue;
/// We need to sort each block separatly before merging.
sortBlock(block, description_to_sort);
removeConstantsFromBlock(block);
/// Find the position of last already read key in current block.
if (!blocks.empty())
{
const Block & last_block = blocks.back();
auto last_columns = getColumnsWithSortDescription(last_block, description_sorted);
auto current_columns = getColumnsWithSortDescription(block, description_sorted);
Less less(last_columns, current_columns);
IColumn::Permutation perm(size);
for (size_t i = 0; i < size; ++i)
perm[i] = i;
auto it = std::upper_bound(perm.begin(), perm.end(), last_block.rows() - 1, less);
/// We need to save tail of block, because next block may starts with the same key as in tail
/// and we should sort these rows in one chunk.
if (it != perm.end())
{
size_t tail_pos = it - perm.begin();
Block head_block = block.cloneEmpty();
tail_block = block.cloneEmpty();
for (size_t i = 0; i < block.columns(); ++i)
{
head_block.getByPosition(i).column = block.getByPosition(i).column->cut(0, tail_pos);
tail_block.getByPosition(i).column = block.getByPosition(i).column->cut(tail_pos, block.rows() - tail_pos);
}
if (head_block.rows())
blocks.push_back(head_block);
break;
}
}
/// If we reach here, that means that current block is first in chunk
/// or it all consists of rows with the same key as tail of a previous block.
blocks.push_back(block);
}
impl = std::make_unique<MergeSortingBlocksBlockInputStream>(blocks, description_to_sort, max_merged_block_size, limit);
res = impl->read();
}
if (res)
enrichBlockWithConstants(res, header);
total_rows_processed += res.rows();
return res;
}
}

View File

@ -0,0 +1,51 @@
#pragma once
#include <Core/SortDescription.h>
#include <Interpreters/sortBlock.h>
#include <DataStreams/IProfilingBlockInputStream.h>
namespace DB
{
/** Takes stream already sorted by `x` and finishes sorting it by (`x`, `y`).
* During sorting only blocks with rows that equal by `x` saved in RAM.
* */
class FinishSortingBlockInputStream : public IProfilingBlockInputStream
{
public:
/// limit - if not 0, allowed to return just first 'limit' rows in sorted order.
FinishSortingBlockInputStream(const BlockInputStreamPtr & input, const SortDescription & description_sorted_,
const SortDescription & description_to_sort_,
size_t max_merged_block_size_, size_t limit_);
String getName() const override { return "FinishSorting"; }
bool isSortedOutput() const override { return true; }
const SortDescription & getSortDescription() const override { return description_to_sort; }
Block getHeader() const override { return header; }
protected:
Block readImpl() override;
private:
SortDescription description_sorted;
SortDescription description_to_sort;
size_t max_merged_block_size;
size_t limit;
Block tail_block;
Blocks blocks;
std::unique_ptr<IBlockInputStream> impl;
/// Before operation, will remove constant columns from blocks. And after, place constant columns back.
/// to avoid excessive virtual function calls
/// Save original block structure here.
Block header;
bool end_of_stream = false;
size_t total_rows_processed = 0;
};
}

View File

@ -2,9 +2,11 @@
#include <DataStreams/MergingSortedBlockInputStream.h>
#include <DataStreams/NativeBlockOutputStream.h>
#include <DataStreams/copyData.h>
#include <DataStreams/processConstants.h>
#include <Common/formatReadable.h>
#include <IO/WriteBufferFromFile.h>
#include <IO/CompressedWriteBuffer.h>
#include <Interpreters/sortBlock.h>
namespace ProfileEvents
@ -16,54 +18,6 @@ namespace ProfileEvents
namespace DB
{
/** Remove constant columns from block.
*/
static void removeConstantsFromBlock(Block & block)
{
size_t columns = block.columns();
size_t i = 0;
while (i < columns)
{
if (block.getByPosition(i).column->isColumnConst())
{
block.erase(i);
--columns;
}
else
++i;
}
}
static void removeConstantsFromSortDescription(const Block & header, SortDescription & description)
{
description.erase(std::remove_if(description.begin(), description.end(),
[&](const SortColumnDescription & elem)
{
if (!elem.column_name.empty())
return header.getByName(elem.column_name).column->isColumnConst();
else
return header.safeGetByPosition(elem.column_number).column->isColumnConst();
}), description.end());
}
/** Add into block, whose constant columns was removed by previous function,
* constant columns from header (which must have structure as before removal of constants from block).
*/
static void enrichBlockWithConstants(Block & block, const Block & header)
{
size_t rows = block.rows();
size_t columns = header.columns();
for (size_t i = 0; i < columns; ++i)
{
const auto & col_type_name = header.getByPosition(i);
if (col_type_name.column->isColumnConst())
block.insert(i, {col_type_name.column->cloneResized(rows), col_type_name.type, col_type_name.name});
}
}
MergeSortingBlockInputStream::MergeSortingBlockInputStream(
const BlockInputStreamPtr & input, SortDescription & description_,
size_t max_merged_block_size_, size_t limit_, size_t max_bytes_before_remerge_,
@ -303,5 +257,4 @@ void MergeSortingBlockInputStream::remerge()
sum_rows_in_blocks = new_sum_rows_in_blocks;
sum_bytes_in_blocks = new_sum_bytes_in_blocks;
}
}

View File

@ -130,5 +130,4 @@ private:
/// If remerge doesn't save memory at least several times, mark it as useless and don't do it anymore.
bool remerge_is_useful = true;
};
}

View File

@ -44,6 +44,9 @@ PushingToViewsBlockOutputStream::PushingToViewsBlockOutputStream(
auto dependent_table = context.getTable(database_table.first, database_table.second);
auto & materialized_view = dynamic_cast<const StorageMaterializedView &>(*dependent_table);
if (StoragePtr inner_table = materialized_view.tryGetTargetTable())
addTableLock(inner_table->lockStructure(true, __PRETTY_FUNCTION__));
auto query = materialized_view.getInnerQuery();
BlockOutputStreamPtr out = std::make_shared<PushingToViewsBlockOutputStream>(
database_table.first, database_table.second, dependent_table, *views_context, ASTPtr());

View File

@ -0,0 +1,48 @@
#include <DataStreams/processConstants.h>
namespace DB
{
void removeConstantsFromBlock(Block & block)
{
size_t columns = block.columns();
size_t i = 0;
while (i < columns)
{
if (block.getByPosition(i).column->isColumnConst())
{
block.erase(i);
--columns;
}
else
++i;
}
}
void removeConstantsFromSortDescription(const Block & header, SortDescription & description)
{
description.erase(std::remove_if(description.begin(), description.end(),
[&](const SortColumnDescription & elem)
{
if (!elem.column_name.empty())
return header.getByName(elem.column_name).column->isColumnConst();
else
return header.safeGetByPosition(elem.column_number).column->isColumnConst();
}), description.end());
}
void enrichBlockWithConstants(Block & block, const Block & header)
{
size_t rows = block.rows();
size_t columns = header.columns();
for (size_t i = 0; i < columns; ++i)
{
const auto & col_type_name = header.getByPosition(i);
if (col_type_name.column->isColumnConst())
block.insert(i, {col_type_name.column->cloneResized(rows), col_type_name.type, col_type_name.name});
}
}
}

View File

@ -0,0 +1,23 @@
#pragma once
#include <Core/Block.h>
#include <Core/SortDescription.h>
namespace DB
{
/** Functions for manipulate constants for sorting.
* See MergeSortingBlocksBlockInputStream and FinishSortingBlockInputStream for details.
*/
/** Remove constant columns from block.
*/
void removeConstantsFromBlock(Block & block);
void removeConstantsFromSortDescription(const Block & header, SortDescription & description);
/** Add into block, whose constant columns was removed by previous function,
* constant columns from header (which must have structure as before removal of constants from block).
*/
void enrichBlockWithConstants(Block & block, const Block & header);
}

View File

@ -11,3 +11,6 @@ target_link_libraries (union_stream2 dbms)
add_executable (collapsing_sorted_stream collapsing_sorted_stream.cpp ${SRCS})
target_link_libraries (collapsing_sorted_stream dbms)
add_executable (finish_sorting_stream finish_sorting_stream.cpp ${SRCS})
target_link_libraries (finish_sorting_stream dbms)

View File

@ -0,0 +1,106 @@
#include <iostream>
#include <iomanip>
#include <DataTypes/DataTypesNumber.h>
#include <Columns/ColumnsNumber.h>
#include <Core/SortDescription.h>
#include <DataStreams/MergeSortingBlockInputStream.h>
#include <DataStreams/PartialSortingBlockInputStream.h>
#include <DataStreams/FinishSortingBlockInputStream.h>
#include <Interpreters/sortBlock.h>
using namespace DB;
int main(int argc, char ** argv)
{
srand(123456);
try
{
size_t m = argc >= 2 ? atoi(argv[1]) : 2;
size_t n = argc >= 3 ? atoi(argv[2]) : 10;
Blocks blocks;
for (size_t t = 0; t < m; ++t)
{
Block block;
for (size_t i = 0; i < 2; ++i)
{
ColumnWithTypeAndName column;
column.name = "col" + std::to_string(i + 1);
column.type = std::make_shared<DataTypeInt32>();
auto col = ColumnInt32::create();
auto & vec = col->getData();
vec.resize(n);
for (size_t j = 0; j < n; ++j)
vec[j] = rand() % 10;
column.column = std::move(col);
block.insert(column);
}
blocks.push_back(block);
}
SortDescription sort_descr;
sort_descr.emplace_back("col1", 1, 1);
for (auto & block : blocks)
sortBlock(block, sort_descr);
BlockInputStreamPtr stream = std::make_shared<MergeSortingBlocksBlockInputStream>(blocks, sort_descr, n);
SortDescription sort_descr_final;
sort_descr_final.emplace_back("col1", 1, 1);
sort_descr_final.emplace_back("col2", 1, 1);
stream = std::make_shared<FinishSortingBlockInputStream>(stream, sort_descr, sort_descr_final, n, 0);
{
Stopwatch stopwatch;
stopwatch.start();
Block res_block = blocks[0].cloneEmpty();
while (Block block = stream->read())
{
for (size_t i = 0; i < block.columns(); ++i)
{
MutableColumnPtr ptr = (*std::move(res_block.getByPosition(i).column)).mutate();
ptr->insertRangeFrom(*block.getByPosition(i).column.get(), 0, block.rows());
}
}
if (res_block.rows() != n * m)
throw Exception("Result block size mismatch");
const auto & columns = res_block.getColumns();
for (size_t i = 1; i < res_block.rows(); ++i)
for (const auto & col : columns)
{
int res = col->compareAt(i - 1, i, *col, 1);
if (res < 0)
break;
else if (res > 0)
throw Exception("Result stream not sorted");
}
stopwatch.stop();
std::cout << std::fixed << std::setprecision(2)
<< "Elapsed " << stopwatch.elapsedSeconds() << " sec."
<< ", " << n / stopwatch.elapsedSeconds() << " rows/sec."
<< std::endl;
}
}
catch (const Exception & e)
{
std::cerr << e.displayText() << std::endl;
return -1;
}
return 0;
}

View File

@ -997,6 +997,10 @@ template <> constexpr bool IsIntegral<DataTypeInt16> = true;
template <> constexpr bool IsIntegral<DataTypeInt32> = true;
template <> constexpr bool IsIntegral<DataTypeInt64> = true;
template <typename DataType> constexpr bool IsFloatingPoint = false;
template <> constexpr bool IsFloatingPoint<DataTypeFloat32> = true;
template <> constexpr bool IsFloatingPoint<DataTypeFloat64> = true;
template <typename DataType> constexpr bool IsDateOrDateTime = false;
template <> constexpr bool IsDateOrDateTime<DataTypeDate> = true;
template <> constexpr bool IsDateOrDateTime<DataTypeDateTime> = true;
@ -1055,7 +1059,12 @@ public:
/// least(Date, Date) -> Date
/// greatest(Date, Date) -> Date
Case<std::is_same_v<LeftDataType, RightDataType> && (std::is_same_v<Op, LeastImpl<T0, T1>> || std::is_same_v<Op, GreatestImpl<T0, T1>>),
LeftDataType>>;
LeftDataType>,
/// Date % Int32 -> int32
Case<std::is_same_v<Op, ModuloImpl<T0, T1>>, Switch<
Case<IsDateOrDateTime<LeftDataType> && IsIntegral<RightDataType>, RightDataType>,
Case<IsDateOrDateTime<LeftDataType> && IsFloatingPoint<RightDataType>, DataTypeInt32>>>>;
};

View File

@ -2,6 +2,8 @@
#include <Parsers/StringRange.h>
#include <Interpreters/ExpressionActions.h>
namespace DB
{
@ -10,9 +12,6 @@ class Context;
class ASTFunction;
struct ProjectionManipulatorBase;
class ExpressionActions;
using ExpressionActionsPtr = std::shared_ptr<ExpressionActions>;
class Set;
using SetPtr = std::shared_ptr<Set>;

View File

@ -1,6 +1,8 @@
#include <Poco/Util/Application.h>
#include <Poco/String.h>
#include <Core/Block.h>
#include <Parsers/ASTFunction.h>
#include <Parsers/ASTIdentifier.h>
#include <Parsers/ASTLiteral.h>
@ -161,12 +163,10 @@ ExpressionAnalyzer::ExpressionAnalyzer(
const Names & required_result_columns_,
size_t subquery_depth_,
bool do_global_,
const SubqueriesForSets & subqueries_for_set_)
: query(query_), context(context_), settings(context.getSettings()),
subquery_depth(subquery_depth_),
source_columns(source_columns_), required_result_columns(required_result_columns_),
storage(storage_),
do_global(do_global_), subqueries_for_sets(subqueries_for_set_)
const SubqueriesForSets & subqueries_for_sets_)
: ExpressionAnalyzerData(source_columns_, required_result_columns_, subqueries_for_sets_),
query(query_), context(context_), settings(context.getSettings()), storage(storage_),
subquery_depth(subquery_depth_), do_global(do_global_)
{
select_query = typeid_cast<ASTSelectQuery *>(query.get());
@ -210,7 +210,7 @@ ExpressionAnalyzer::ExpressionAnalyzer(
InJoinSubqueriesPreprocessor(context).process(select_query);
/// Optimizes logical expressions.
LogicalExpressionsOptimizer(select_query, settings).perform();
LogicalExpressionsOptimizer(select_query, settings.min_equality_disjunction_chain_length).perform();
/// Creates a dictionary `aliases`: alias -> ASTPtr
{
@ -868,7 +868,7 @@ void ExpressionAnalyzer::tryMakeSetForIndexFromSubquery(const ASTPtr & subquery_
{
BlockIO res = interpretSubquery(subquery_or_table_name, context, subquery_depth + 1, {})->execute();
SetPtr set = std::make_shared<Set>(getSetSizeLimits(settings), true);
SetPtr set = std::make_shared<Set>(settings.size_limits_for_set, true);
set->setHeader(res.in->getHeader());
while (Block block = res.in->read())
@ -925,7 +925,8 @@ void ExpressionAnalyzer::makeSetsForIndexImpl(const ASTPtr & node, const Block &
Block sample_block_with_calculated_columns = temp_actions->getSampleBlock();
if (sample_block_with_calculated_columns.has(args.children.at(0)->getColumnName()))
makeExplicitSet(func, sample_block_with_calculated_columns, true, context, getSetSizeLimits(settings), prepared_sets);
makeExplicitSet(func, sample_block_with_calculated_columns, true, context,
settings.size_limits_for_set, prepared_sets);
}
}
}
@ -1048,7 +1049,7 @@ void ExpressionAnalyzer::getRootActions(const ASTPtr & ast, bool no_subqueries,
bool is_conditional_tree = !isThereArrayJoin(ast) && settings.enable_conditional_computation && !only_consts;
LogAST log;
ActionsVisitor actions_visitor(context, getSetSizeLimits(settings), is_conditional_tree, subquery_depth,
ActionsVisitor actions_visitor(context, settings.size_limits_for_set, is_conditional_tree, subquery_depth,
source_columns, actions, prepared_sets, subqueries_for_sets,
no_subqueries, only_consts, !isRemoteStorage(), log.stream());
actions_visitor.visit(ast);
@ -1062,7 +1063,7 @@ void ExpressionAnalyzer::getActionsFromJoinKeys(const ASTTableJoin & table_join,
bool is_conditional_tree = !isThereArrayJoin(query) && settings.enable_conditional_computation && !only_consts;
LogAST log;
ActionsVisitor actions_visitor(context, getSetSizeLimits(settings), is_conditional_tree, subquery_depth,
ActionsVisitor actions_visitor(context, settings.size_limits_for_set, is_conditional_tree, subquery_depth,
source_columns, actions, prepared_sets, subqueries_for_sets,
no_subqueries, only_consts, !isRemoteStorage(), log.stream());
@ -1320,9 +1321,9 @@ bool ExpressionAnalyzer::appendJoin(ExpressionActionsChain & chain, bool only_ty
if (join_params.strictness == ASTTableJoin::Strictness::Unspecified && join_params.kind != ASTTableJoin::Kind::Cross)
{
if (settings.join_default_strictness.toString() == "ANY")
if (settings.join_default_strictness == "ANY")
join_params.strictness = ASTTableJoin::Strictness::Any;
else if (settings.join_default_strictness.toString() == "ALL")
else if (settings.join_default_strictness == "ALL")
join_params.strictness = ASTTableJoin::Strictness::All;
else
throw Exception("Expected ANY or ALL in JOIN section, because setting (join_default_strictness) is empty", DB::ErrorCodes::EXPECTED_ALL_OR_ANY);
@ -1364,7 +1365,7 @@ bool ExpressionAnalyzer::appendJoin(ExpressionActionsChain & chain, bool only_ty
{
JoinPtr join = std::make_shared<Join>(
analyzed_join.key_names_left, analyzed_join.key_names_right, analyzed_join.columns_added_by_join_from_right_keys,
settings.join_use_nulls, SizeLimits(settings.max_rows_in_join, settings.max_bytes_in_join, settings.join_overflow_mode),
settings.join_use_nulls, settings.size_limits_for_join,
join_params.kind, join_params.strictness);
/** For GLOBAL JOINs (in the case, for example, of the push method for executing GLOBAL subqueries), the following occurs

View File

@ -2,22 +2,22 @@
#include <Interpreters/AggregateDescription.h>
#include <Interpreters/Settings.h>
#include <Interpreters/ExpressionActions.h>
#include <Interpreters/ActionsVisitor.h>
#include <Core/Block.h>
#include <Parsers/ASTTablesInSelectQuery.h>
namespace DB
{
class Block;
class Context;
class ExpressionActions;
struct ExpressionActionsChain;
class ExpressionActions;
using ExpressionActionsPtr = std::shared_ptr<ExpressionActions>;
class IAST;
using ASTPtr = std::shared_ptr<IAST>;
using ASTs = std::vector<ASTPtr>;
struct ASTTableJoin;
class IBlockInputStream;
using BlockInputStreamPtr = std::shared_ptr<IBlockInputStream>;
@ -31,20 +31,104 @@ class ASTExpressionList;
class ASTSelectQuery;
inline SizeLimits getSetSizeLimits(const Settings & settings)
/// ExpressionAnalyzers sources, intermediates and results. It splits data and logic, allows to test them separately.
/// If you are not writing a test you probably don't need it. Use ExpressionAnalyzer itself.
struct ExpressionAnalyzerData
{
return SizeLimits(settings.max_rows_in_set, settings.max_bytes_in_set, settings.set_overflow_mode);
}
/// Original columns.
/// First, all available columns of the table are placed here. Then (when analyzing the query), unused columns are deleted.
NamesAndTypesList source_columns;
/// If non-empty, ignore all expressions in not from this list.
Names required_result_columns;
SubqueriesForSets subqueries_for_sets;
PreparedSets prepared_sets;
/// Columns after ARRAY JOIN, JOIN, and/or aggregation.
NamesAndTypesList aggregated_columns;
NamesAndTypesList array_join_columns;
bool has_aggregation = false;
NamesAndTypesList aggregation_keys;
AggregateDescriptions aggregate_descriptions;
bool has_global_subqueries = false;
using Aliases = std::unordered_map<String, ASTPtr>;
Aliases aliases;
/// Which column is needed to be ARRAY-JOIN'ed to get the specified.
/// For example, for `SELECT s.v ... ARRAY JOIN a AS s` will get "s.v" -> "a.v".
NameToNameMap array_join_result_to_source;
/// For the ARRAY JOIN section, mapping from the alias to the full column name.
/// For example, for `ARRAY JOIN [1,2] AS b` "b" -> "array(1,2)" will enter here.
NameToNameMap array_join_alias_to_name;
/// The backward mapping for array_join_alias_to_name.
NameToNameMap array_join_name_to_alias;
/// All new temporary tables obtained by performing the GLOBAL IN/JOIN subqueries.
Tables external_tables;
/// Predicate optimizer overrides the sub queries
bool rewrite_subqueries = false;
protected:
ExpressionAnalyzerData(const NamesAndTypesList & source_columns_,
const Names & required_result_columns_,
const SubqueriesForSets & subqueries_for_sets_)
: source_columns(source_columns_),
required_result_columns(required_result_columns_),
subqueries_for_sets(subqueries_for_sets_)
{}
};
/** Transforms an expression from a syntax tree into a sequence of actions to execute it.
*
* NOTE: if `ast` is a SELECT query from a table, the structure of this table should not change during the lifetime of ExpressionAnalyzer.
*/
class ExpressionAnalyzer : private boost::noncopyable
class ExpressionAnalyzer : private ExpressionAnalyzerData, private boost::noncopyable
{
private:
using ExpressionActionsPtr = std::shared_ptr<ExpressionActions>;
/// Extracts settings to enlight which are used (and avoid copy of others).
struct ExtractedSettings
{
/// for QueryNormalizer
const UInt64 max_ast_depth;
const UInt64 max_expanded_ast_elements;
const String count_distinct_implementation;
/// for PredicateExpressionsOptimizer
const bool enable_optimize_predicate_expression;
/// for ExpressionAnalyzer
const bool asterisk_left_columns_only;
const bool use_index_for_in_with_subqueries;
const bool enable_conditional_computation;
const bool join_use_nulls;
const SizeLimits size_limits_for_set;
const SizeLimits size_limits_for_join;
const String join_default_strictness;
const UInt64 min_equality_disjunction_chain_length;
ExtractedSettings(const Settings & settings)
: max_ast_depth(settings.max_ast_depth),
max_expanded_ast_elements(settings.max_expanded_ast_elements),
count_distinct_implementation(settings.count_distinct_implementation),
enable_optimize_predicate_expression(settings.enable_optimize_predicate_expression),
asterisk_left_columns_only(settings.asterisk_left_columns_only),
use_index_for_in_with_subqueries(settings.use_index_for_in_with_subqueries),
enable_conditional_computation(settings.enable_conditional_computation),
join_use_nulls(settings.join_use_nulls),
size_limits_for_set(settings.max_rows_in_set, settings.max_bytes_in_set, settings.set_overflow_mode),
size_limits_for_join(settings.max_rows_in_join, settings.max_bytes_in_join, settings.join_overflow_mode),
join_default_strictness(settings.join_default_strictness.toString()),
min_equality_disjunction_chain_length(settings.optimize_min_equality_disjunction_chain_length)
{}
};
public:
ExpressionAnalyzer(
@ -126,6 +210,9 @@ public:
*/
const Tables & getExternalTables() const { return external_tables; }
/// Get intermediates for tests
const ExpressionAnalyzerData & getAnalyzedData() const { return *this; }
/// Create Set-s that we can from IN section to use the index on them.
void makeSetsForIndex();
@ -137,37 +224,10 @@ private:
ASTPtr query;
ASTSelectQuery * select_query;
const Context & context;
const Settings settings;
const ExtractedSettings settings;
StoragePtr storage; /// The main table in FROM clause, if exists.
size_t subquery_depth;
/** Original columns.
* First, all available columns of the table are placed here. Then (when analyzing the query), unused columns are deleted.
*/
NamesAndTypesList source_columns;
/** If non-empty, ignore all expressions in not from this list.
*/
Names required_result_columns;
/// Columns after ARRAY JOIN, JOIN, and/or aggregation.
NamesAndTypesList aggregated_columns;
NamesAndTypesList array_join_columns;
/// The main table in FROM clause, if exists.
StoragePtr storage;
bool has_aggregation = false;
NamesAndTypesList aggregation_keys;
AggregateDescriptions aggregate_descriptions;
/// Do I need to prepare for execution global subqueries when analyzing the query.
bool do_global;
bool has_global_subqueries = false;
SubqueriesForSets subqueries_for_sets;
PreparedSets prepared_sets;
bool do_global; /// Do I need to prepare for execution global subqueries when analyzing the query.
struct AnalyzedJoin
{
@ -228,29 +288,6 @@ private:
AnalyzedJoin analyzed_join;
using Aliases = std::unordered_map<String, ASTPtr>;
Aliases aliases;
using SetOfASTs = std::set<const IAST *>;
using MapOfASTs = std::map<ASTPtr, ASTPtr>;
/// Which column is needed to be ARRAY-JOIN'ed to get the specified.
/// For example, for `SELECT s.v ... ARRAY JOIN a AS s` will get "s.v" -> "a.v".
NameToNameMap array_join_result_to_source;
/// For the ARRAY JOIN section, mapping from the alias to the full column name.
/// For example, for `ARRAY JOIN [1,2] AS b` "b" -> "array(1,2)" will enter here.
NameToNameMap array_join_alias_to_name;
/// The backward mapping for array_join_alias_to_name.
NameToNameMap array_join_name_to_alias;
/// All new temporary tables obtained by performing the GLOBAL IN/JOIN subqueries.
Tables external_tables;
/// Predicate optimizer overrides the sub queries
bool rewrite_subqueries = false;
/** Remove all unnecessary columns from the list of all available columns of the table (`columns`).
* At the same time, form a set of unknown columns (`unknown_required_source_columns`),

View File

@ -443,7 +443,7 @@ void InterpreterCreateQuery::setEngine(ASTCreateQuery & create) const
{
if (create.storage)
{
if (create.is_temporary && create.storage->engine->name != "Memory")
if (create.temporary && create.storage->engine->name != "Memory")
throw Exception(
"Temporary tables can only be created with ENGINE = Memory, not " + create.storage->engine->name,
ErrorCodes::INCORRECT_QUERY);
@ -451,7 +451,7 @@ void InterpreterCreateQuery::setEngine(ASTCreateQuery & create) const
return;
}
if (create.is_temporary)
if (create.temporary)
{
auto engine_ast = std::make_shared<ASTFunction>();
engine_ast->name = "Memory";
@ -546,7 +546,7 @@ BlockIO InterpreterCreateQuery::createTable(ASTCreateQuery & create)
String data_path;
DatabasePtr database;
if (!create.is_temporary)
if (!create.temporary)
{
database = context.getDatabase(database_name);
data_path = database->getDataPath();
@ -578,7 +578,7 @@ BlockIO InterpreterCreateQuery::createTable(ASTCreateQuery & create)
create.attach,
false);
if (create.is_temporary)
if (create.temporary)
context.getSessionContext().addExternalTable(table_name, res, query_ptr);
else
database->createTable(context, table_name, res, query_ptr);
@ -601,17 +601,17 @@ BlockIO InterpreterCreateQuery::createTable(ASTCreateQuery & create)
{
auto insert = std::make_shared<ASTInsertQuery>();
if (!create.is_temporary)
if (!create.temporary)
insert->database = database_name;
insert->table = table_name;
insert->select = create.select->clone();
if (create.is_temporary && !context.getSessionContext().hasQueryContext())
if (create.temporary && !context.getSessionContext().hasQueryContext())
context.getSessionContext().setQueryContext(context.getSessionContext());
return InterpreterInsertQuery(insert,
create.is_temporary ? context.getSessionContext() : context,
create.temporary ? context.getSessionContext() : context,
context.getSettingsRef().insert_allow_materialized_columns).execute();
}
@ -657,7 +657,7 @@ void InterpreterCreateQuery::checkAccess(const ASTCreateQuery & create)
throw Exception("Cannot create database. DDL queries are prohibited for the user", ErrorCodes::QUERY_IS_PROHIBITED);
}
if (create.is_temporary && readonly >= 2)
if (create.temporary && readonly >= 2)
return;
if (readonly)

View File

@ -30,7 +30,7 @@ bool LogicalExpressionsOptimizer::OrWithExpression::operator<(const OrWithExpres
return std::tie(this->or_function, this->expression) < std::tie(rhs.or_function, rhs.expression);
}
LogicalExpressionsOptimizer::LogicalExpressionsOptimizer(ASTSelectQuery * select_query_, const Settings & settings_)
LogicalExpressionsOptimizer::LogicalExpressionsOptimizer(ASTSelectQuery * select_query_, ExtractedSettings && settings_)
: select_query(select_query_), settings(settings_)
{
}

View File

@ -16,6 +16,7 @@ struct Settings;
class ASTFunction;
class ASTSelectQuery;
/** This class provides functions for optimizing boolean expressions within queries.
*
* For simplicity, we call a homogeneous OR-chain any expression having the following structure:
@ -24,9 +25,18 @@ class ASTSelectQuery;
*/
class LogicalExpressionsOptimizer final
{
struct ExtractedSettings
{
const UInt64 optimize_min_equality_disjunction_chain_length;
ExtractedSettings(UInt64 optimize_min_equality_disjunction_chain_length_)
: optimize_min_equality_disjunction_chain_length(optimize_min_equality_disjunction_chain_length_)
{}
};
public:
/// Constructor. Accepts the root of the query DAG.
LogicalExpressionsOptimizer(ASTSelectQuery * select_query_, const Settings & settings_);
LogicalExpressionsOptimizer(ASTSelectQuery * select_query_, ExtractedSettings && settings_);
/** Replace all rather long homogeneous OR-chains expr = x1 OR ... OR expr = xN
* on the expressions `expr` IN (x1, ..., xN).
@ -90,7 +100,7 @@ private:
private:
ASTSelectQuery * select_query;
const Settings & settings;
const ExtractedSettings settings;
/// Information about the OR-chains inside the query.
DisjunctiveEqualityChainsMap disjunctive_equality_chains_map;
/// Number of processed OR-chains.

View File

@ -16,7 +16,7 @@ namespace DB
static constexpr auto and_function_name = "and";
PredicateExpressionsOptimizer::PredicateExpressionsOptimizer(
ASTSelectQuery * ast_select_, const Settings & settings_, const Context & context_)
ASTSelectQuery * ast_select_, ExtractedSettings && settings_, const Context & context_)
: ast_select(ast_select_), settings(settings_), context(context_)
{
}
@ -395,7 +395,7 @@ ASTs PredicateExpressionsOptimizer::evaluateAsterisk(ASTSelectQuery * select_que
{
const auto database_and_table_ast = static_cast<ASTIdentifier*>(table_expression->database_and_table_name.get());
const auto database_and_table_name = getDatabaseAndTableNameFromIdentifier(*database_and_table_ast);
storage = context.tryGetTable(database_and_table_name.first, database_and_table_name.second);
storage = context.getTable(database_and_table_name.first, database_and_table_name.second);
}
const auto block = storage->getSampleBlock();

View File

@ -41,14 +41,34 @@ using IdentifiersWithQualifiedNameSet = std::vector<IdentifierWithQualifiedName>
*/
class PredicateExpressionsOptimizer
{
/// Extracts settings, mostly to show which are used and which are not.
struct ExtractedSettings
{
/// QueryNormalizer settings
const UInt64 max_ast_depth;
const UInt64 max_expanded_ast_elements;
const String count_distinct_implementation;
/// for PredicateExpressionsOptimizer
const bool enable_optimize_predicate_expression;
template<typename T>
ExtractedSettings(const T & settings)
: max_ast_depth(settings.max_ast_depth),
max_expanded_ast_elements(settings.max_expanded_ast_elements),
count_distinct_implementation(settings.count_distinct_implementation),
enable_optimize_predicate_expression(settings.enable_optimize_predicate_expression)
{}
};
public:
PredicateExpressionsOptimizer(ASTSelectQuery * ast_select_, const Settings & settings_, const Context & context_);
PredicateExpressionsOptimizer(ASTSelectQuery * ast_select_, ExtractedSettings && settings_, const Context & context_);
bool optimize();
private:
ASTSelectQuery * ast_select;
const Settings & settings;
const ExtractedSettings settings;
const Context & context;
enum OptimizeKind

View File

@ -9,7 +9,8 @@
#include <Common/typeid_cast.h>
#include <Poco/String.h>
#include <Parsers/ASTQualifiedAsterisk.h>
#include <iostream>
//#include <iostream>
#include <IO/WriteHelpers.h>
namespace DB
{
@ -22,9 +23,9 @@ namespace ErrorCodes
QueryNormalizer::QueryNormalizer(ASTPtr & query, const QueryNormalizer::Aliases & aliases,
const Settings & settings, const Names & all_column_names,
ExtractedSettings && settings_, const Names & all_column_names,
const TableNamesAndColumnNames & table_names_and_column_names)
: query(query), aliases(aliases), settings(settings), all_column_names(all_column_names),
: query(query), aliases(aliases), settings(settings_), all_column_names(all_column_names),
table_names_and_column_names(table_names_and_column_names)
{
}
@ -52,7 +53,7 @@ void QueryNormalizer::perform()
void QueryNormalizer::performImpl(ASTPtr & ast, MapOfASTs & finished_asts, SetOfASTs & current_asts, std::string current_alias, size_t level)
{
if (level > settings.max_ast_depth)
throw Exception("Normalized AST is too deep. Maximum: " + settings.max_ast_depth.toString(), ErrorCodes::TOO_DEEP_AST);
throw Exception("Normalized AST is too deep. Maximum: " + toString(settings.max_ast_depth), ErrorCodes::TOO_DEEP_AST);
if (finished_asts.count(ast))
{

View File

@ -2,7 +2,6 @@
#include <Core/Names.h>
#include <Parsers/IAST.h>
#include <Interpreters/Settings.h>
#include <Interpreters/evaluateQualified.h>
namespace DB
@ -22,12 +21,28 @@ inline bool functionIsInOrGlobalInOperator(const String & name)
using TableNameAndColumnNames = std::pair<DatabaseAndTableWithAlias, Names>;
using TableNamesAndColumnNames = std::vector<TableNameAndColumnNames>;
class QueryNormalizer
{
/// Extracts settings, mostly to show which are used and which are not.
struct ExtractedSettings
{
const UInt64 max_ast_depth;
const UInt64 max_expanded_ast_elements;
const String count_distinct_implementation;
template <typename T>
ExtractedSettings(const T & settings)
: max_ast_depth(settings.max_ast_depth),
max_expanded_ast_elements(settings.max_expanded_ast_elements),
count_distinct_implementation(settings.count_distinct_implementation)
{}
};
public:
using Aliases = std::unordered_map<String, ASTPtr>;
QueryNormalizer(ASTPtr & query, const Aliases & aliases, const Settings & settings, const Names & all_columns_name,
QueryNormalizer(ASTPtr & query, const Aliases & aliases, ExtractedSettings && settings, const Names & all_columns_name,
const TableNamesAndColumnNames & table_names_and_column_names);
void perform();
@ -38,7 +53,7 @@ private:
ASTPtr & query;
const Aliases & aliases;
const Settings & settings;
const ExtractedSettings settings;
const Names & all_column_names;
const TableNamesAndColumnNames & table_names_and_column_names;

View File

@ -13,9 +13,19 @@ namespace ErrorCodes
}
using ColumnsWithSortDescriptions = std::vector<std::pair<const IColumn *, SortColumnDescription>>;
static inline bool needCollation(const IColumn * column, const SortColumnDescription & description)
{
if (!description.collator)
return false;
static ColumnsWithSortDescriptions getColumnsWithSortDescription(const Block & block, const SortDescription & description)
if (!typeid_cast<const ColumnString *>(column)) /// TODO Nullable(String)
throw Exception("Collations could be specified only for String columns.", ErrorCodes::BAD_COLLATION);
return true;
}
ColumnsWithSortDescriptions getColumnsWithSortDescription(const Block & block, const SortDescription & description)
{
size_t size = description.size();
ColumnsWithSortDescriptions res;
@ -34,18 +44,6 @@ static ColumnsWithSortDescriptions getColumnsWithSortDescription(const Block & b
}
static inline bool needCollation(const IColumn * column, const SortColumnDescription & description)
{
if (!description.collator)
return false;
if (!typeid_cast<const ColumnString *>(column)) /// TODO Nullable(String)
throw Exception("Collations could be specified only for String columns.", ErrorCodes::BAD_COLLATION);
return true;
}
struct PartialSortingLess
{
const ColumnsWithSortDescriptions & columns;
@ -66,6 +64,7 @@ struct PartialSortingLess
}
};
struct PartialSortingLessWithCollation
{
const ColumnsWithSortDescriptions & columns;

View File

@ -29,4 +29,8 @@ void stableGetPermutation(const Block & block, const SortDescription & descripti
*/
bool isAlreadySorted(const Block & block, const SortDescription & description);
using ColumnsWithSortDescriptions = std::vector<std::pair<const IColumn *, SortColumnDescription>>;
ColumnsWithSortDescriptions getColumnsWithSortDescription(const Block & block, const SortDescription & description);
}

View File

@ -45,6 +45,10 @@ add_executable (in_join_subqueries_preprocessor in_join_subqueries_preprocessor.
target_link_libraries (in_join_subqueries_preprocessor dbms)
add_check(in_join_subqueries_preprocessor)
add_executable (expression_analyzer expression_analyzer.cpp)
target_link_libraries (expression_analyzer dbms clickhouse_storages_system)
add_check(expression_analyzer)
add_executable (users users.cpp)
target_link_libraries (users dbms ${Boost_FILESYSTEM_LIBRARY})

View File

@ -0,0 +1,148 @@
#include <DataTypes/DataTypesNumber.h>
#include <Storages/System/StorageSystemOne.h>
#include <Storages/System/StorageSystemNumbers.h>
#include <Databases/DatabaseMemory.h>
#include <Parsers/ParserSelectQuery.h>
#include <Parsers/parseQuery.h>
#include <Interpreters/Context.h>
#include <Interpreters/ExpressionAnalyzer.h>
#include <Analyzers/CollectAliases.h>
#include <Analyzers/ExecuteTableFunctions.h>
#include <Analyzers/CollectTables.h>
#include <IO/WriteBufferFromFileDescriptor.h>
#include <IO/ReadBufferFromFileDescriptor.h>
#include <vector>
#include <unordered_map>
#include <iostream>
using namespace DB;
struct TestEntry
{
String query;
std::unordered_map<String, String> expected_aliases; /// alias -> AST.getID()
NamesAndTypesList source_columns = {};
Names required_result_columns = {};
bool check(const Context & context)
{
ASTPtr ast = parse(query);
ExpressionAnalyzer analyzer(ast, context, {}, source_columns, required_result_columns);
const ExpressionAnalyzerData & data = analyzer.getAnalyzedData();
if (!checkAliases(data))
{
collectWithAnalysers(context, ast);
return false;
}
return true;
}
private:
bool checkAliases(const ExpressionAnalyzerData & data)
{
for (const auto & alias : data.aliases)
{
const String & alias_name = alias.first;
if (expected_aliases.count(alias_name) == 0 ||
expected_aliases[alias_name] != alias.second->getID())
{
std::cout << "unexpected alias: " << alias_name << ' ' << alias.second->getID() << std::endl;
return false;
}
else
expected_aliases.erase(alias_name);
}
if (!expected_aliases.empty())
{
std::cout << "missing aliases: " << expected_aliases.size() << std::endl;
return false;
}
return true;
}
static ASTPtr parse(const std::string & query)
{
ParserSelectQuery parser;
std::string message;
auto text = query.data();
if (ASTPtr ast = tryParseQuery(parser, text, text + query.size(), message, false, "", false, 0))
return ast;
throw Exception(message);
}
void collectWithAnalysers(const Context & context, ASTPtr ast) const
{
ReadBufferFromFileDescriptor in(STDIN_FILENO);
WriteBufferFromFileDescriptor out(STDOUT_FILENO);
CollectAliases collect_aliases;
collect_aliases.process(ast);
ExecuteTableFunctions execute_table_functions;
execute_table_functions.process(ast, context);
CollectTables collect_tables;
collect_tables.process(ast, context, collect_aliases, execute_table_functions);
collect_tables.dump(out);
}
};
int main()
{
std::vector<TestEntry> queries =
{
{
"SELECT number AS n FROM system.numbers LIMIT 0",
{{"n", "Identifier_number"}},
{ NameAndTypePair("number", std::make_shared<DataTypeUInt64>()) }
},
{
"SELECT number AS n FROM system.numbers LIMIT 0",
{{"n", "Identifier_number"}}
}
};
Context context = Context::createGlobal();
auto system_database = std::make_shared<DatabaseMemory>("system");
context.addDatabase("system", system_database);
//context.setCurrentDatabase("system");
system_database->attachTable("one", StorageSystemOne::create("one"));
system_database->attachTable("numbers", StorageSystemNumbers::create("numbers", false));
size_t success = 0;
for (auto & entry : queries)
{
try
{
if (entry.check(context))
{
++success;
std::cout << "[OK] " << entry.query << std::endl;
}
else
std::cout << "[Failed] " << entry.query << std::endl;
}
catch (Exception & e)
{
std::cout << "[Error] " << entry.query << std::endl << e.displayText() << std::endl;
}
}
return success != queries.size();
}

View File

@ -213,10 +213,7 @@ TestResult check(const TestEntry & entry)
auto select_query = typeid_cast<DB::ASTSelectQuery *>(&*ast_input);
DB::Settings settings;
settings.optimize_min_equality_disjunction_chain_length = entry.limit;
DB::LogicalExpressionsOptimizer optimizer(select_query, settings);
DB::LogicalExpressionsOptimizer optimizer(select_query, entry.limit);
optimizer.perform();
/// Parse the expected result.

View File

@ -1,7 +1,7 @@
#pragma once
#include <Parsers/IAST.h>
#include <Parsers/ASTQueryWithOutput.h>
#include <Parsers/ASTQueryWithTableAndOutput.h>
#include <Parsers/ASTQueryWithOnCluster.h>
@ -113,12 +113,9 @@ protected:
void formatImpl(const FormatSettings & settings, FormatState & state, FormatStateStacked frame) const override;
};
class ASTAlterQuery : public ASTQueryWithOutput, public ASTQueryWithOnCluster
class ASTAlterQuery : public ASTQueryWithTableAndOutput, public ASTQueryWithOnCluster
{
public:
String database;
String table;
ASTAlterCommandList * command_list = nullptr;
String getID() const override;

View File

@ -1,11 +1,11 @@
#pragma once
#include <Parsers/ASTQueryWithOutput.h>
#include <Parsers/ASTQueryWithTableAndOutput.h>
namespace DB
{
struct ASTCheckQuery : public ASTQueryWithOutput
struct ASTCheckQuery : public ASTQueryWithTableAndOutput
{
/** Get the text that identifies this element. */
String getID() const override { return ("CheckQuery_" + database + "_" + table); }
@ -18,9 +18,6 @@ struct ASTCheckQuery : public ASTQueryWithOutput
return res;
}
std::string database;
std::string table;
protected:
void formatQueryImpl(const FormatSettings & settings, FormatState &, FormatStateStacked frame) const override
{

View File

@ -4,7 +4,7 @@
#include <Parsers/ASTFunction.h>
#include <Parsers/ASTSetQuery.h>
#include <Parsers/ASTSelectWithUnionQuery.h>
#include <Parsers/ASTQueryWithOutput.h>
#include <Parsers/ASTQueryWithTableAndOutput.h>
#include <Parsers/ASTQueryWithOnCluster.h>
@ -74,7 +74,7 @@ public:
/// CREATE TABLE or ATTACH TABLE query
class ASTCreateQuery : public ASTQueryWithOutput, public ASTQueryWithOnCluster
class ASTCreateQuery : public ASTQueryWithTableAndOutput, public ASTQueryWithOnCluster
{
public:
bool attach{false}; /// Query ATTACH TABLE, not CREATE TABLE.
@ -82,9 +82,6 @@ public:
bool is_view{false};
bool is_materialized_view{false};
bool is_populate{false};
bool is_temporary{false};
String database;
String table;
ASTExpressionList * columns = nullptr;
String to_database; /// For CREATE MATERIALIZED VIEW mv TO table.
String to_table;
@ -155,7 +152,7 @@ protected:
settings.ostr
<< (settings.hilite ? hilite_keyword : "")
<< (attach ? "ATTACH " : "CREATE ")
<< (is_temporary ? "TEMPORARY " : "")
<< (temporary ? "TEMPORARY " : "")
<< what << " "
<< (if_not_exists ? "IF NOT EXISTS " : "")
<< (settings.hilite ? hilite_none : "")

View File

@ -1,6 +1,6 @@
#pragma once
#include <Parsers/ASTQueryWithOutput.h>
#include <Parsers/ASTQueryWithTableAndOutput.h>
#include <Parsers/ASTQueryWithOnCluster.h>
@ -9,7 +9,7 @@ namespace DB
/** DROP query
*/
class ASTDropQuery : public ASTQueryWithOutput, public ASTQueryWithOnCluster
class ASTDropQuery : public ASTQueryWithTableAndOutput, public ASTQueryWithOnCluster
{
public:
enum Kind
@ -21,9 +21,6 @@ public:
Kind kind;
bool if_exists{false};
bool temporary{false};
String database;
String table;
/** Get the text that identifies this element. */
String getID() const override;

View File

@ -1,7 +1,7 @@
#pragma once
#include <Parsers/IAST.h>
#include <Parsers/ASTQueryWithOutput.h>
#include <Parsers/ASTQueryWithTableAndOutput.h>
#include <Parsers/ASTQueryWithOnCluster.h>
namespace DB
@ -10,12 +10,9 @@ namespace DB
/** OPTIMIZE query
*/
class ASTOptimizeQuery : public ASTQueryWithOutput, public ASTQueryWithOnCluster
class ASTOptimizeQuery : public ASTQueryWithTableAndOutput, public ASTQueryWithOnCluster
{
public:
String database;
String table;
/// The partition to optimize can be specified.
ASTPtr partition;
/// A flag can be specified - perform optimization "to the end" instead of one step.
@ -44,7 +41,6 @@ public:
void formatQueryImpl(const FormatSettings & settings, FormatState & state, FormatStateStacked frame) const override;
ASTPtr getRewrittenASTWithoutOnCluster(const std::string &new_database) const override;
};
}

View File

@ -7,6 +7,7 @@
#include <Parsers/ASTSubquery.h>
#include <Parsers/ASTTablesInSelectQuery.h>
#include <Common/typeid_cast.h>
#include "ASTSelectWithUnionQuery.h"
namespace DB
@ -339,14 +340,28 @@ const ASTTablesInSelectQueryElement * ASTSelectQuery::join() const
void ASTSelectQuery::setDatabaseIfNeeded(const String & database_name)
{
ASTTableExpression * table_expression = getFirstTableExpression(*this);
if (!table_expression)
if (!tables)
return;
if (!table_expression->database_and_table_name)
return;
ASTTablesInSelectQuery & tables_in_select_query = static_cast<ASTTablesInSelectQuery &>(*tables);
if (table_expression->database_and_table_name->children.empty())
for (auto & child : tables_in_select_query.children)
{
const auto & tables_element = static_cast<ASTTablesInSelectQueryElement &>(*child);
if (tables_element.table_expression)
{
const auto table_expression = static_cast<ASTTableExpression *>(tables_element.table_expression.get());
if (!table_expression->database_and_table_name && !table_expression->subquery)
continue;
if (table_expression->subquery)
{
const auto subquery = static_cast<const ASTSubquery *>(table_expression->subquery.get());
const auto select_with_union_query = static_cast<ASTSelectWithUnionQuery *>(subquery->children[0].get());
select_with_union_query->setDatabaseIfNeeded(database_name);
}
else if (table_expression->database_and_table_name->children.empty())
{
ASTPtr database = ASTIdentifier::createSpecial(database_name);
ASTPtr table = table_expression->database_and_table_name;
@ -359,6 +374,8 @@ void ASTSelectQuery::setDatabaseIfNeeded(const String & database_name)
{
throw Exception("Logical error: more than two components in table expression", ErrorCodes::LOGICAL_ERROR);
}
}
}
}

View File

@ -391,7 +391,7 @@ bool ParserCreateQuery::parseImpl(Pos & pos, ASTPtr & node, Expected & expected)
query->is_view = is_view;
query->is_materialized_view = is_materialized_view;
query->is_populate = is_populate;
query->is_temporary = is_temporary;
query->temporary = is_temporary;
if (database)
query->database = typeid_cast<ASTIdentifier &>(*database).name;

View File

@ -26,7 +26,8 @@ struct KafkaSettings
M(SettingString, kafka_format, "", "Message format for Kafka engine.") \
M(SettingChar, kafka_row_delimiter, '\0', "The character to be considered as a delimiter in Kafka message.") \
M(SettingString, kafka_schema, "", "Schema identifier (used by schema-based formats) for Kafka engine") \
M(SettingUInt64, kafka_num_consumers, 1, "The number of consumers per table for Kafka engine.")
M(SettingUInt64, kafka_num_consumers, 1, "The number of consumers per table for Kafka engine.") \
M(SettingUInt64, kafka_max_block_size, 0, "The maximum block size per table for Kafka engine.")
#define DECLARE(TYPE, NAME, DEFAULT, DESCRIPTION) \
TYPE NAME {DEFAULT};

View File

@ -25,6 +25,7 @@
#include <Parsers/ASTCreateQuery.h>
#include <Storages/Kafka/KafkaSettings.h>
#include <Storages/Kafka/StorageKafka.h>
#include <Storages/StorageMaterializedView.h>
#include <Storages/StorageFactory.h>
#include <IO/ReadBuffer.h>
#include <common/logger_useful.h>
@ -268,7 +269,8 @@ StorageKafka::StorageKafka(
Context & context_,
const ColumnsDescription & columns_,
const String & brokers_, const String & group_, const Names & topics_,
const String & format_name_, char row_delimiter_, const String & schema_name_, size_t num_consumers_)
const String & format_name_, char row_delimiter_, const String & schema_name_,
size_t num_consumers_, size_t max_block_size_)
: IStorage{columns_},
table_name(table_name_), database_name(database_name_), context(context_),
topics(context.getMacros()->expand(topics_)),
@ -277,7 +279,7 @@ StorageKafka::StorageKafka(
format_name(context.getMacros()->expand(format_name_)),
row_delimiter(row_delimiter_),
schema_name(context.getMacros()->expand(schema_name_)),
num_consumers(num_consumers_), log(&Logger::get("StorageKafka (" + table_name_ + ")")),
num_consumers(num_consumers_), max_block_size(max_block_size_), log(&Logger::get("StorageKafka (" + table_name_ + ")")),
semaphore(0, num_consumers_), mutex(), consumers()
{
task = context.getSchedulePool().createTask(log->name(), [this]{ streamThread(); });
@ -295,10 +297,10 @@ BlockInputStreams StorageKafka::read(
{
check(column_names);
if (num_consumers == 0)
if (num_created_consumers == 0)
return BlockInputStreams();
const size_t stream_count = std::min(num_streams, num_consumers);
const size_t stream_count = std::min(num_streams, num_created_consumers);
BlockInputStreams streams;
streams.reserve(stream_count);
@ -434,26 +436,44 @@ void StorageKafka::pushConsumer(StorageKafka::ConsumerPtr c)
semaphore.set();
}
bool StorageKafka::checkDependencies(const String & database_name, const String & table_name)
{
// Check if all dependencies are attached
auto dependencies = context.getDependencies(database_name, table_name);
if (dependencies.size() == 0)
return true;
// Check the dependencies are ready?
for (const auto & db_tab : dependencies)
{
auto table = context.tryGetTable(db_tab.first, db_tab.second);
if (!table)
return false;
// If it materialized view, check it's target table
auto * materialized_view = dynamic_cast<StorageMaterializedView *>(table.get());
if (materialized_view && !materialized_view->tryGetTargetTable())
return false;
// Check all its dependencies
if (!checkDependencies(db_tab.first, db_tab.second))
return false;
}
return true;
}
void StorageKafka::streamThread()
{
try
{
// Keep streaming as long as there are attached views and streaming is not cancelled
while (!stream_cancelled)
{
// Check if all dependencies are attached
// Check if at least one direct dependency is attached
auto dependencies = context.getDependencies(database_name, table_name);
if (dependencies.size() == 0)
break;
// Check the dependencies are ready?
bool ready = true;
for (const auto & db_tab : dependencies)
// Keep streaming as long as there are attached views and streaming is not cancelled
while (!stream_cancelled && num_created_consumers > 0 && dependencies.size() > 0)
{
if (!context.tryGetTable(db_tab.first, db_tab.second))
ready = false;
}
if (!ready)
if (!checkDependencies(database_name, table_name))
break;
LOG_DEBUG(log, "Started streaming to " << dependencies.size() << " attached views");
@ -488,12 +508,14 @@ bool StorageKafka::streamToViews()
// Limit the number of batched messages to allow early cancellations
const Settings & settings = context.getSettingsRef();
const size_t block_size = settings.max_block_size.value;
size_t block_size = max_block_size;
if (block_size == 0)
block_size = settings.max_block_size.value;
// Create a stream for each consumer and join them in a union stream
BlockInputStreams streams;
streams.reserve(num_consumers);
for (size_t i = 0; i < num_consumers; ++i)
streams.reserve(num_created_consumers);
for (size_t i = 0; i < num_created_consumers; ++i)
{
auto stream = std::make_shared<KafkaBlockInputStream>(*this, context, schema_name, block_size);
streams.emplace_back(stream);
@ -509,7 +531,7 @@ bool StorageKafka::streamToViews()
// Join multiple streams if necessary
BlockInputStreamPtr in;
if (streams.size() > 1)
in = std::make_shared<UnionBlockInputStream<>>(streams, nullptr, num_consumers);
in = std::make_shared<UnionBlockInputStream<>>(streams, nullptr, streams.size());
else
in = streams[0];
@ -644,6 +666,7 @@ void registerStorageKafka(StorageFactory & factory)
CHECK_KAFKA_STORAGE_ARGUMENT(5, kafka_row_delimiter)
CHECK_KAFKA_STORAGE_ARGUMENT(6, kafka_schema)
CHECK_KAFKA_STORAGE_ARGUMENT(7, kafka_num_consumers)
CHECK_KAFKA_STORAGE_ARGUMENT(8, kafka_max_block_size)
#undef CHECK_KAFKA_STORAGE_ARGUMENT
// Get and check broker list
@ -790,9 +813,28 @@ void registerStorageKafka(StorageFactory & factory)
num_consumers = kafka_settings.kafka_num_consumers.value;
}
// Parse max block size (optional)
size_t max_block_size = 0;
if (args_count >= 8)
{
auto ast = typeid_cast<const ASTLiteral *>(engine_args[7].get());
if (ast && ast->value.getType() == Field::Types::UInt64)
{
max_block_size = static_cast<size_t>(safeGet<UInt64>(ast->value));
}
else
{
throw Exception("Maximum block size must be a positive integer", ErrorCodes::BAD_ARGUMENTS);
}
}
else if (kafka_settings.kafka_max_block_size.changed)
{
max_block_size = static_cast<size_t>(kafka_settings.kafka_max_block_size.value);
}
return StorageKafka::create(
args.table_name, args.database_name, args.context, args.columns,
brokers, group, topics, format, row_delimiter, schema, num_consumers);
brokers, group, topics, format, row_delimiter, schema, num_consumers, max_block_size);
});
}

View File

@ -83,6 +83,8 @@ private:
const String schema_name;
/// Total number of consumers
size_t num_consumers;
/// Maximum block size for insertion into this table
size_t max_block_size;
/// Number of actually created consumers.
/// Can differ from num_consumers in case of exception in startup() (or if startup() hasn't been called).
/// In this case we still need to be able to shutdown() properly.
@ -105,6 +107,7 @@ private:
void streamThread();
bool streamToViews();
bool checkDependencies(const String & database_name, const String & table_name);
protected:
StorageKafka(
@ -113,7 +116,8 @@ protected:
Context & context_,
const ColumnsDescription & columns_,
const String & brokers_, const String & group_, const Names & topics_,
const String & format_name_, char row_delimiter_, const String & schema_name_, size_t num_consumers_);
const String & format_name_, char row_delimiter_, const String & schema_name_,
size_t num_consumers_, size_t max_block_size_);
};
}

View File

@ -137,11 +137,11 @@ BlockInputStreams MergeTreeDataSelectExecutor::read(
const Context & context,
const size_t max_block_size,
const unsigned num_streams,
Int64 max_block_number_to_read) const
const PartitionIdToMaxBlock * max_block_numbers_to_read) const
{
return readFromParts(
data.getDataPartsVector(), column_names_to_return, query_info, context,
max_block_size, num_streams, max_block_number_to_read);
max_block_size, num_streams, max_block_numbers_to_read);
}
BlockInputStreams MergeTreeDataSelectExecutor::readFromParts(
@ -151,7 +151,7 @@ BlockInputStreams MergeTreeDataSelectExecutor::readFromParts(
const Context & context,
const size_t max_block_size,
const unsigned num_streams,
Int64 max_block_number_to_read) const
const PartitionIdToMaxBlock * max_block_numbers_to_read) const
{
size_t part_index = 0;
@ -271,8 +271,12 @@ BlockInputStreams MergeTreeDataSelectExecutor::readFromParts(
part->minmax_idx.parallelogram, data.minmax_idx_column_types))
continue;
if (max_block_number_to_read && part->info.max_block > max_block_number_to_read)
if (max_block_numbers_to_read)
{
auto blocks_iterator = max_block_numbers_to_read->find(part->info.partition_id);
if (blocks_iterator == max_block_numbers_to_read->end() || part->info.max_block > blocks_iterator->second)
continue;
}
parts.push_back(part);
}

View File

@ -20,15 +20,17 @@ public:
MergeTreeDataSelectExecutor(const MergeTreeData & data_);
/** When reading, selects a set of parts that covers the desired range of the index.
* max_block_number_to_read - if not zero, do not read all the parts whose right border is greater than this threshold.
* max_blocks_number_to_read - if not nullptr, do not read all the parts whose right border is greater than max_block in partition.
*/
using PartitionIdToMaxBlock = std::unordered_map<String, Int64>;
BlockInputStreams read(
const Names & column_names,
const SelectQueryInfo & query_info,
const Context & context,
size_t max_block_size,
unsigned num_streams,
Int64 max_block_number_to_read) const;
const PartitionIdToMaxBlock * max_block_numbers_to_read = nullptr) const;
BlockInputStreams readFromParts(
MergeTreeData::DataPartsVector parts,
@ -37,7 +39,7 @@ public:
const Context & context,
size_t max_block_size,
unsigned num_streams,
Int64 max_block_number_to_read) const;
const PartitionIdToMaxBlock * max_block_numbers_to_read = nullptr) const;
private:
const MergeTreeData & data;

View File

@ -1401,7 +1401,16 @@ ReplicatedMergeTreeMergePredicate::ReplicatedMergeTreeMergePredicate(
Coordination::GetResponse quorum_last_part_response = quorum_last_part_future.get();
if (!quorum_last_part_response.error)
last_quorum_part = quorum_last_part_response.data;
{
ReplicatedMergeTreeQuorumAddedParts parts_with_quorum(queue.format_version);
if (!quorum_last_part_response.data.empty())
{
parts_with_quorum.fromString(quorum_last_part_response.data);
last_quorum_parts.clear();
for (const auto & added_part : parts_with_quorum.added_parts)
last_quorum_parts.emplace(added_part.second);
}
}
Coordination::GetResponse quorum_status_response = quorum_status_future.get();
if (!quorum_status_response.error)
@ -1460,7 +1469,7 @@ bool ReplicatedMergeTreeMergePredicate::operator()(
for (const MergeTreeData::DataPartPtr & part : {left, right})
{
if (part->name == last_quorum_part)
if (last_quorum_parts.find(part->name) != last_quorum_parts.end())
{
if (out_reason)
*out_reason = "Part " + part->name + " is the most recent part with a satisfied quorum";
@ -1563,7 +1572,7 @@ std::optional<Int64> ReplicatedMergeTreeMergePredicate::getDesiredMutationVersio
/// the part (checked by querying queue.virtual_parts), we can confidently assign a mutation to
/// version X for this part.
if (part->name == last_quorum_part
if (last_quorum_parts.find(part->name) != last_quorum_parts.end()
|| part->name == inprogress_quorum_part)
return {};

View File

@ -8,6 +8,7 @@
#include <Storages/MergeTree/ActiveDataPartSet.h>
#include <Storages/MergeTree/MergeTreeData.h>
#include <Storages/MergeTree/MergeTreeMutationStatus.h>
#include <Storages/MergeTree/ReplicatedMergeTreeQuorumAddedParts.h>
#include <Common/ZooKeeper/ZooKeeper.h>
#include <Core/BackgroundSchedulePool.h>
@ -375,7 +376,7 @@ private:
std::unordered_map<String, std::set<Int64>> committing_blocks;
/// Quorum state taken at some later time than prev_virtual_parts.
String last_quorum_part;
std::set<std::string> last_quorum_parts;
String inprogress_quorum_part;
};

View File

@ -0,0 +1,124 @@
#pragma once
#include <unordered_map>
#include <IO/ReadBuffer.h>
#include <IO/ReadBufferFromString.h>
#include <IO/WriteBuffer.h>
#include <IO/WriteBufferFromString.h>
#include <IO/ReadHelpers.h>
#include <IO/Operators.h>
#include <Storages/MergeTree/MergeTreeDataPart.h>
namespace DB
{
struct ReplicatedMergeTreeQuorumAddedParts
{
using PartitionIdToMaxBlock = std::unordered_map<String, Int64>;
using PartitionIdToPartName = std::unordered_map<String, String>;
PartitionIdToPartName added_parts;
MergeTreeDataFormatVersion format_version;
ReplicatedMergeTreeQuorumAddedParts(const MergeTreeDataFormatVersion format_version_)
: format_version(format_version_)
{}
/// Write new parts in buffer with added parts.
void write(WriteBuffer & out)
{
out << "version: " << 2 << '\n';
out << "parts count: " << added_parts.size() << '\n';
for (const auto & part : added_parts)
out << part.first << '\t' << part.second << '\n';
}
PartitionIdToMaxBlock getMaxInsertedBlocks()
{
PartitionIdToMaxBlock max_added_blocks;
for (const auto & part : added_parts)
{
auto part_info = MergeTreePartInfo::fromPartName(part.second, format_version);
max_added_blocks[part.first] = part_info.max_block;
}
return max_added_blocks;
}
void read(ReadBuffer & in)
{
if (checkString("version: ", in))
{
size_t version;
readText(version, in);
assertChar('\n', in);
if (version == 2)
added_parts = read_v2(in);
}
else
added_parts = read_v1(in);
}
/// Read added bloks when node in ZooKeeper supports only one partition.
PartitionIdToPartName read_v1(ReadBuffer & in)
{
PartitionIdToPartName parts_in_quorum;
std::string part_name;
readText(part_name, in);
auto part_info = MergeTreePartInfo::fromPartName(part_name, format_version);
parts_in_quorum[part_info.partition_id] = part_name;
return parts_in_quorum;
}
/// Read blocks when node in ZooKeeper suppors multiple partitions.
PartitionIdToPartName read_v2(ReadBuffer & in)
{
assertString("parts count: ", in);
PartitionIdToPartName parts_in_quorum;
uint64_t parts_count;
readText(parts_count, in);
assertChar('\n', in);
for (uint64_t i = 0; i < parts_count; ++i)
{
std::string partition_id;
std::string part_name;
readText(partition_id, in);
assertChar('\t', in);
readText(part_name, in);
assertChar('\n', in);
parts_in_quorum[partition_id] = part_name;
}
return parts_in_quorum;
}
void fromString(const std::string & str)
{
ReadBufferFromString in(str);
read(in);
}
std::string toString()
{
WriteBufferFromOwnString out;
write(out);
return out.str();
}
};
}

View File

@ -27,7 +27,7 @@ public:
unsigned num_streams) override
{
return MergeTreeDataSelectExecutor(part->storage).readFromParts(
{part}, column_names, query_info, context, max_block_size, num_streams, 0);
{part}, column_names, query_info, context, max_block_size, num_streams);
}
bool supportsIndexForIn() const override { return true; }

View File

@ -48,6 +48,9 @@ public:
QueryProcessingStage::Enum getQueryProcessingStage(const Context & context) const override;
StoragePtr getTargetTable() const;
StoragePtr tryGetTargetTable() const;
BlockInputStreams read(
const Names & column_names,
const SelectQueryInfo & query_info,
@ -69,9 +72,6 @@ private:
Context & global_context;
bool has_inner_table = false;
StoragePtr getTargetTable() const;
StoragePtr tryGetTargetTable() const;
void checkStatementCanBeForwarded() const;
protected:

View File

@ -114,7 +114,7 @@ BlockInputStreams StorageMergeTree::read(
const size_t max_block_size,
const unsigned num_streams)
{
return reader.read(column_names, query_info, context, max_block_size, num_streams, 0);
return reader.read(column_names, query_info, context, max_block_size, num_streams);
}
BlockOutputStreamPtr StorageMergeTree::write(const ASTPtr & /*query*/, const Settings & /*settings*/)

View File

@ -11,6 +11,7 @@
#include <Storages/MergeTree/ReplicatedMergeTreeMutationEntry.h>
#include <Storages/MergeTree/MergeList.h>
#include <Storages/MergeTree/ReplicatedMergeTreeAddress.h>
#include <Storages/MergeTree/ReplicatedMergeTreeQuorumAddedParts.h>
#include <Databases/IDatabase.h>
@ -2674,8 +2675,22 @@ void StorageReplicatedMergeTree::updateQuorum(const String & part_name)
Coordination::Requests ops;
Coordination::Responses responses;
Coordination::Stat added_parts_stat;
String old_added_parts = zookeeper->get(quorum_last_part_path, &added_parts_stat);
ReplicatedMergeTreeQuorumAddedParts parts_with_quorum(data.format_version);
if (!old_added_parts.empty())
parts_with_quorum.fromString(old_added_parts);
auto part_info = MergeTreePartInfo::fromPartName(part_name, data.format_version);
parts_with_quorum.added_parts[part_info.partition_id] = part_name;
String new_added_parts = parts_with_quorum.toString();
ops.emplace_back(zkutil::makeRemoveRequest(quorum_status_path, stat.version));
ops.emplace_back(zkutil::makeSetRequest(quorum_last_part_path, part_name, -1));
ops.emplace_back(zkutil::makeSetRequest(quorum_last_part_path, new_added_parts, added_parts_stat.version));
auto code = zookeeper->tryMulti(ops, responses);
if (code == Coordination::ZOK)
@ -2950,37 +2965,56 @@ BlockInputStreams StorageReplicatedMergeTree::read(
* 2. Do not read parts that have not yet been written to the quorum of the replicas.
* For this you have to synchronously go to ZooKeeper.
*/
Int64 max_block_number_to_read = 0;
if (settings.select_sequential_consistency)
{
ReplicatedMergeTreeQuorumAddedParts::PartitionIdToMaxBlock max_added_blocks;
for (const auto & data_part : data.getDataParts())
{
max_added_blocks[data_part->info.partition_id] = std::max(max_added_blocks[data_part->info.partition_id], data_part->info.max_block);
}
auto zookeeper = getZooKeeper();
String last_part;
zookeeper->tryGet(zookeeper_path + "/quorum/last_part", last_part);
const String quorum_status_path = zookeeper_path + "/quorum/status";
if (!last_part.empty() && !data.getActiveContainingPart(last_part)) /// TODO Disable replica for distributed queries.
throw Exception("Replica doesn't have part " + last_part + " which was successfully written to quorum of other replicas."
" Send query to another replica or disable 'select_sequential_consistency' setting.", ErrorCodes::REPLICA_IS_NOT_IN_QUORUM);
String value;
Coordination::Stat stat;
if (last_part.empty()) /// If no part has been written with quorum.
{
String quorum_str;
if (zookeeper->tryGet(zookeeper_path + "/quorum/status", quorum_str))
if (zookeeper->tryGet(quorum_status_path, value, &stat))
{
ReplicatedMergeTreeQuorumEntry quorum_entry;
quorum_entry.fromString(quorum_str);
quorum_entry.fromString(value);
auto part_info = MergeTreePartInfo::fromPartName(quorum_entry.part_name, data.format_version);
max_block_number_to_read = part_info.min_block - 1;
max_added_blocks[part_info.partition_id] = part_info.max_block - 1;
}
}
else
String added_parts_str;
if (zookeeper->tryGet(zookeeper_path + "/quorum/last_part", added_parts_str))
{
auto part_info = MergeTreePartInfo::fromPartName(last_part, data.format_version);
max_block_number_to_read = part_info.max_block;
if (!added_parts_str.empty())
{
ReplicatedMergeTreeQuorumAddedParts part_with_quorum(data.format_version);
part_with_quorum.fromString(added_parts_str);
auto added_parts = part_with_quorum.added_parts;
for (const auto & added_part : added_parts)
if (!data.getActiveContainingPart(added_part.second))
throw Exception("Replica doesn't have part " + added_part.second + " which was successfully written to quorum of other replicas."
" Send query to another replica or disable 'select_sequential_consistency' setting.", ErrorCodes::REPLICA_IS_NOT_IN_QUORUM);
for (const auto & max_block : part_with_quorum.getMaxInsertedBlocks())
max_added_blocks[max_block.first] = max_block.second;
}
}
return reader.read(column_names, query_info, context, max_block_size, num_streams, max_block_number_to_read);
return reader.read(column_names, query_info, context, max_block_size, num_streams, &max_added_blocks);
}
return reader.read(column_names, query_info, context, max_block_size, num_streams);
}

View File

@ -0,0 +1,12 @@
1
1
1
1
1
1
1
1
1
1
1
1

View File

@ -0,0 +1,13 @@
SELECT toDate('2018-06-21') % 234 = toInt16(toDate('2018-06-21')) % 234;
SELECT toDate('2018-06-21') % 23456 = toInt16(toDate('2018-06-21')) % 23456;
SELECT toDate('2018-06-21') % 12376 = toInt16(toDate('2018-06-21')) % 12376;
SELECT toDateTime('2018-06-21 12:12:12') % 234 = toInt32(toDateTime('2018-06-21 12:12:12')) % 234;
SELECT toDateTime('2018-06-21 12:12:12') % 23456 = toInt32(toDateTime('2018-06-21 12:12:12')) % 23456;
SELECT toDateTime('2018-06-21 12:12:12') % 12376 = toInt32(toDateTime('2018-06-21 12:12:12')) % 12376;
SELECT toDate('2018-06-21') % 234.8 = toInt16(toDate('2018-06-21')) % 234.8;
SELECT toDate('2018-06-21') % 23456.8 = toInt16(toDate('2018-06-21')) % 23456.8;
SELECT toDate('2018-06-21') % 12376.8 = toInt16(toDate('2018-06-21')) % 12376.8;
SELECT toDateTime('2018-06-21 12:12:12') % 234.8 = toInt32(toDateTime('2018-06-21 12:12:12')) % 234.8;
SELECT toDateTime('2018-06-21 12:12:12') % 23456.8 = toInt32(toDateTime('2018-06-21 12:12:12')) % 23456.8;
SELECT toDateTime('2018-06-21 12:12:12') % 12376.8 = toInt32(toDateTime('2018-06-21 12:12:12')) % 12376.8;

View File

@ -0,0 +1,18 @@
1
2
3
1
2
3
1
2
3
4
5
6
1
2
3
4
5
6

View File

@ -0,0 +1,30 @@
SET send_logs_level = 'none';
DROP TABLE IF EXISTS test.quorum1;
DROP TABLE IF EXISTS test.quorum2;
CREATE TABLE test.quorum1(x UInt32, y Date) ENGINE ReplicatedMergeTree('/clickhouse/tables/test/quorum', '1') ORDER BY x PARTITION BY y;
CREATE TABLE test.quorum2(x UInt32, y Date) ENGINE ReplicatedMergeTree('/clickhouse/tables/test/quorum', '2') ORDER BY x PARTITION BY y;
INSERT INTO test.quorum1 VALUES (1, '1990-11-15');
INSERT INTO test.quorum1 VALUES (2, '1990-11-15');
INSERT INTO test.quorum1 VALUES (3, '2020-12-16');
SYSTEM SYNC REPLICA test.quorum2;
SET select_sequential_consistency=1;
SELECT x FROM test.quorum1 ORDER BY x;
SELECT x FROM test.quorum2 ORDER BY x;
SET insert_quorum=2;
INSERT INTO test.quorum1 VALUES (4, '1990-11-15');
INSERT INTO test.quorum1 VALUES (5, '1990-11-15');
INSERT INTO test.quorum1 VALUES (6, '2020-12-16');
SELECT x FROM test.quorum1 ORDER BY x;
SELECT x FROM test.quorum2 ORDER BY x;
DROP TABLE IF EXISTS test.quorum1;
DROP TABLE IF EXISTS test.quorum2;

View File

@ -0,0 +1,18 @@
1
2
3
1
2
3
1
2
3
4
1
2
3
4
1
2
3
4

View File

@ -0,0 +1,38 @@
SET send_logs_level = 'none';
DROP TABLE IF EXISTS test.quorum1;
DROP TABLE IF EXISTS test.quorum2;
CREATE TABLE test.quorum1(x UInt32, y Date) ENGINE ReplicatedMergeTree('/clickhouse/tables/test/quorum', '1') ORDER BY x PARTITION BY y;
CREATE TABLE test.quorum2(x UInt32, y Date) ENGINE ReplicatedMergeTree('/clickhouse/tables/test/quorum', '2') ORDER BY x PARTITION BY y;
SET insert_quorum=2;
SET select_sequential_consistency=1;
INSERT INTO test.quorum1 VALUES (1, '2018-11-15');
INSERT INTO test.quorum1 VALUES (2, '2018-11-15');
INSERT INTO test.quorum1 VALUES (3, '2018-12-16');
SET insert_quorum_timeout=0;
SYSTEM STOP FETCHES test.quorum1;
INSERT INTO test.quorum2 VALUES (4, toDate('2018-12-16')); -- { serverError 319 }
SELECT x FROM test.quorum1 ORDER BY x;
SELECT x FROM test.quorum2 ORDER BY x;
SET select_sequential_consistency=0;
SELECT x FROM test.quorum2 ORDER BY x;
SET select_sequential_consistency=1;
SYSTEM START FETCHES test.quorum1;
SYSTEM SYNC REPLICA test.quorum1;
SELECT x FROM test.quorum1 ORDER BY x;
SELECT x FROM test.quorum2 ORDER BY x;
DROP TABLE IF EXISTS test.quorum1;
DROP TABLE IF EXISTS test.quorum2;

View File

@ -0,0 +1,35 @@
SET send_logs_level = 'none';
DROP TABLE IF EXISTS test.quorum1;
DROP TABLE IF EXISTS test.quorum2;
CREATE TABLE test.quorum1(x UInt32, y Date) ENGINE ReplicatedMergeTree('/clickhouse/tables/test/quorum', '1') ORDER BY x PARTITION BY y;
CREATE TABLE test.quorum2(x UInt32, y Date) ENGINE ReplicatedMergeTree('/clickhouse/tables/test/quorum', '2') ORDER BY x PARTITION BY y;
SET insert_quorum=2;
SET select_sequential_consistency=1;
SET insert_quorum_timeout=0;
SYSTEM STOP FETCHES test.quorum1;
INSERT INTO test.quorum2 VALUES (1, '2018-11-15'); -- { serverError 319 }
SELECT count(*) FROM test.quorum1;
SELECT count(*) FROM test.quorum2;
SET select_sequential_consistency=0;
SELECT x FROM test.quorum2 ORDER BY x;
SET select_sequential_consistency=1;
SET insert_quorum_timeout=100;
SYSTEM START FETCHES test.quorum1;
SYSTEM SYNC REPLICA test.quorum1;
SELECT x FROM test.quorum1 ORDER BY x;
SELECT x FROM test.quorum2 ORDER BY x;
DROP TABLE IF EXISTS test.quorum1;
DROP TABLE IF EXISTS test.quorum2;

View File

@ -0,0 +1,28 @@
SET send_logs_level = 'none';
DROP TABLE IF EXISTS test.quorum1;
DROP TABLE IF EXISTS test.quorum2;
CREATE TABLE test.quorum1(x UInt32, y Date) ENGINE ReplicatedMergeTree('/clickhouse/tables/test/quorum', '1') ORDER BY x PARTITION BY y;
CREATE TABLE test.quorum2(x UInt32, y Date) ENGINE ReplicatedMergeTree('/clickhouse/tables/test/quorum', '2') ORDER BY x PARTITION BY y;
INSERT INTO test.quorum1 VALUES (1, '1990-11-15');
INSERT INTO test.quorum1 VALUES (2, '1990-11-15');
INSERT INTO test.quorum1 VALUES (3, '2020-12-16');
SYSTEM SYNC REPLICA test.quorum2;
SET select_sequential_consistency=1;
SET insert_quorum=2;
SET insert_quorum_timeout=0;
SYSTEM STOP FETCHES test.quorum1;
INSERT INTO test.quorum2 VALUES (4, toDate('2020-12-16')); -- { serverError 319 }
SELECT x FROM test.quorum1 ORDER BY x;
SELECT x FROM test.quorum2 ORDER BY x;
DROP TABLE IF EXISTS test.quorum1;
DROP TABLE IF EXISTS test.quorum2;

View File

@ -0,0 +1,23 @@
SET send_logs_level = 'none';
DROP TABLE IF EXISTS test.quorum1;
DROP TABLE IF EXISTS test.quorum2;
CREATE TABLE test.quorum1(x UInt32, y Date) ENGINE ReplicatedMergeTree('/clickhouse/tables/test/quorum', '1') ORDER BY x PARTITION BY y;
CREATE TABLE test.quorum2(x UInt32, y Date) ENGINE ReplicatedMergeTree('/clickhouse/tables/test/quorum', '2') ORDER BY x PARTITION BY y;
SET insert_quorum=2;
SET select_sequential_consistency=1;
INSERT INTO test.quorum1 VALUES (1, '2018-11-15');
INSERT INTO test.quorum1 VALUES (2, '2018-11-15');
SELECT x FROM test.quorum1 ORDER BY x;
SELECT x FROM test.quorum2 ORDER BY x;
OPTIMIZE TABLE test.quorum1 PARTITION '2018-11-15' FINAL;
SELECT count(*) FROM system.parts WHERE active AND database = 'test' AND table='quorum1';
DROP TABLE IF EXISTS test.quorum1;
DROP TABLE IF EXISTS test.quorum2;

View File

@ -0,0 +1,20 @@
SET send_logs_level = 'none';
DROP TABLE IF EXISTS test.quorum1;
DROP TABLE IF EXISTS test.quorum2;
CREATE TABLE test.quorum1(x UInt32, y Date) ENGINE ReplicatedMergeTree('/clickhouse/tables/test/quorum', '1') ORDER BY x PARTITION BY y;
CREATE TABLE test.quorum2(x UInt32, y Date) ENGINE ReplicatedMergeTree('/clickhouse/tables/test/quorum', '2') ORDER BY x PARTITION BY y;
SET insert_quorum=2;
SET select_sequential_consistency=1;
INSERT INTO test.quorum1 VALUES (1, '2018-11-15');
INSERT INTO test.quorum1 VALUES (2, '2018-11-15');
INSERT INTO test.quorum1 VALUES (3, '2018-12-16');
SELECT x FROM test.quorum1 ORDER BY x;
SELECT x FROM test.quorum2 ORDER BY x;
DROP TABLE IF EXISTS test.quorum1;
DROP TABLE IF EXISTS test.quorum2;

View File

@ -0,0 +1,25 @@
#!/usr/bin/env bash
set -e
CURDIR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)
. $CURDIR/../shell_config.sh
echo "DROP TABLE IF EXISTS test.tab;
DROP TABLE IF EXISTS test.mv;
CREATE TABLE test.tab(a Int) ENGINE = Log;
CREATE MATERIALIZED VIEW test.mv ENGINE = Log AS SELECT a FROM test.tab;" | ${CLICKHOUSE_CLIENT} -n
${CLICKHOUSE_CLIENT} --query "INSERT INTO test.tab SELECT number FROM numbers(10000000)" &
function drop()
{
sleep 0.1
${CLICKHOUSE_CLIENT} --query "DROP TABLE test.\`.inner.mv\`" -n
}
drop &
wait
echo "DROP TABLE IF EXISTS test.tab;
DROP TABLE IF EXISTS test.mv;" | ${CLICKHOUSE_CLIENT} -n

View File

@ -0,0 +1,6 @@
1
1
1 1
1
1
1 1

View File

@ -0,0 +1,23 @@
DROP TABLE IF EXISTS test.test;
DROP TABLE IF EXISTS test.test_view;
DROP TABLE IF EXISTS test.test_nested_view;
DROP TABLE IF EXISTS test.test_joined_view;
USE test;
CREATE VIEW test AS SELECT 1 AS N;
CREATE VIEW test_view AS SELECT * FROM test;
CREATE VIEW test_nested_view AS SELECT * FROM (SELECT * FROM test);
CREATE VIEW test_joined_view AS SELECT * FROM test ANY LEFT JOIN test USING N;
SELECT * FROM test_view;
SELECT * FROM test_nested_view;
SELECT * FROM test_joined_view;
USE default;
SELECT * FROM test.test_view;
SELECT * FROM test.test_nested_view;
SELECT * FROM test.test_joined_view;
DROP TABLE IF EXISTS test.test;
DROP TABLE IF EXISTS test.test_view;
DROP TABLE IF EXISTS test.test_nested_view;

View File

@ -1,18 +0,0 @@
DROP TABLE IF EXISTS test.whoami;
DROP TABLE IF EXISTS test.tellme;
DROP TABLE IF EXISTS test.tellme_nested;
use test;
create view whoami as select 1 as n;
create view tellme as select * from whoami;
create view tellme_nested as select * from (select * from whoami);
select * from tellme;
select * from tellme_nested;
use default;
select * from test.tellme;
select * from test.tellme_nested;
DROP TABLE test.whoami;
DROP TABLE test.tellme;
DROP TABLE test.tellme_nested;

View File

@ -0,0 +1,28 @@
FROM ubuntu:18.04
RUN apt-get update -y \
&& env DEBIAN_FRONTEND=noninteractive \
apt-get install --yes --no-install-recommends \
bash \
tzdata \
fakeroot \
debhelper \
parallel \
expect \
python \
python-lxml \
python-termcolor \
python-requests \
curl \
sudo \
openssl \
netcat-openbsd \
telnet
COPY ./stress /stress
CMD dpkg -i package_folder/clickhouse-common-static_*.deb; \
dpkg -i package_folder/clickhouse-server_*.deb; \
dpkg -i package_folder/clickhouse-client_*.deb; \
dpkg -i package_folder/clickhouse-test_*.deb; \
service clickhouse-server start && sleep 1 && ./stress --output-folder test_output

View File

@ -0,0 +1,31 @@
Allow to run simple ClickHouse stress test in Docker from debian packages.
Actually it runs single copy of clickhouse-performance-test and multiple copies
of clickhouse-test (functional tests). This allows to find problems like
segmentation fault which cause shutdown of server.
Usage:
```
$ ls $HOME/someclickhouse
clickhouse-client_18.14.9_all.deb clickhouse-common-static_18.14.9_amd64.deb clickhouse-server_18.14.9_all.deb clickhouse-test_18.14.9_all.deb
$ docker run --volume=$HOME/someclickhouse:/package_folder --volume=$HOME/test_output:/test_output yandex/clickhouse-stress-test
Selecting previously unselected package clickhouse-common-static.
(Reading database ... 14442 files and directories currently installed.)
...
Start clickhouse-server service: Path to data directory in /etc/clickhouse-server/config.xml: /var/lib/clickhouse/
DONE
2018-10-22 13:40:35,744 Will wait functests to finish
2018-10-22 13:40:40,747 Finished 0 from 16 processes
2018-10-22 13:40:45,751 Finished 0 from 16 processes
...
2018-10-22 13:49:11,165 Finished 15 from 16 processes
2018-10-22 13:49:16,171 Checking ClickHouse still alive
Still alive
2018-10-22 13:49:16,195 Stress is ok
2018-10-22 13:49:16,195 Copying server log files
$ ls $HOME/test_result
clickhouse-server.err.log clickhouse-server.log.0.gz stderr.log stress_test_run_0.txt stress_test_run_11.txt stress_test_run_13.txt
stress_test_run_15.txt stress_test_run_2.txt stress_test_run_4.txt stress_test_run_6.txt stress_test_run_8.txt clickhouse-server.log
perf_stress_run.txt stdout.log stress_test_run_10.txt stress_test_run_12.txt
stress_test_run_14.txt stress_test_run_1.txt
stress_test_run_3.txt stress_test_run_5.txt stress_test_run_7.txt stress_test_run_9.txt
```

75
docker/test/stress/stress Executable file
View File

@ -0,0 +1,75 @@
#!/usr/bin/env python
#-*- coding: utf-8 -*-
from multiprocessing import cpu_count
from subprocess import Popen, check_call
import os
import shutil
import argparse
import logging
import time
def run_perf_test(cmd, xmls_path, output_folder):
output_path = os.path.join(output_folder, "perf_stress_run.txt")
f = open(output_path, 'w')
p = Popen("{} --skip-tags=long --r {}".format(cmd, xmls_path), shell=True, stdout=f, stderr=f)
return p
def run_func_test(cmd, output_prefix, num_processes):
output_paths = [os.path.join(output_prefix, "stress_test_run_{}.txt".format(i)) for i in range(num_processes)]
f = open(output_paths[0], 'w')
pipes = [Popen("{}".format(cmd), shell=True, stdout=f, stderr=f)]
for output_path in output_paths[1:]:
time.sleep(0.5)
f = open(output_path, 'w')
p = Popen("{} --order=random".format(cmd), shell=True, stdout=f, stderr=f)
pipes.append(p)
return pipes
def check_clickhouse_alive(cmd):
try:
logging.info("Checking ClickHouse still alive")
check_call("{} --query \"select 'Still alive'\"".format(cmd), shell=True)
return True
except:
return False
if __name__ == "__main__":
logging.basicConfig(level=logging.INFO, format='%(asctime)s %(message)s')
parser = argparse.ArgumentParser(description="ClickHouse script for running stresstest")
parser.add_argument("--test-cmd", default='clickhouse-test')
parser.add_argument("--client-cmd", default='clickhouse-client')
parser.add_argument("--perf-test-cmd", default='clickhouse-performance-test')
parser.add_argument("--perf-test-xml-path", default='/usr/share/clickhouse-test/performance/')
parser.add_argument("--server-log-folder", default='/var/log/clickhouse-server')
parser.add_argument("--output-folder")
parser.add_argument("--num-parallel", default=cpu_count() // 3);
args = parser.parse_args()
func_pipes = []
perf_process = None
try:
perf_process = run_perf_test(args.perf_test_cmd, args.perf_test_xml_path, args.output_folder)
func_pipes = run_func_test(args.test_cmd, args.output_folder, args.num_parallel)
logging.info("Will wait functests to finish")
while True:
retcodes = []
for p in func_pipes:
if p.poll():
retcodes.append(p.returncode)
if len(retcodes) == len(func_pipes):
break
logging.info("Finished %s from %s processes", len(retcodes), len(func_pipes))
time.sleep(5)
if not check_clickhouse_alive(args.client_cmd):
raise Exception("Stress failed, results in logs")
else:
logging.info("Stress is ok")
except Exception as ex:
raise ex
finally:
if os.path.exists(args.server_log_folder):
logging.info("Copying server log files")
for log_file in os.listdir(args.server_log_folder):
shutil.copy(os.path.join(args.server_log_folder, log_file), os.path.join(args.output_folder, log_file))

View File

@ -18,7 +18,11 @@ MySQL('host:port', 'database', 'table', 'user', 'password'[, replace_query, 'on_
- `user` — The MySQL User.
- `password` — User password.
- `replace_query` — Flag that sets query substitution `INSERT INTO` to `REPLACE INTO`. If `replace_query=1`, the query is replaced.
- `'on_duplicate_clause'` — Adds the `ON DUPLICATE KEY UPDATE 'on_duplicate_clause'` expression to the `INSERT` query. For example: `impression = VALUES(impression) + impression`. To specify `'on_duplicate_clause'` you need to pass `0` to the `replace_query` parameter. If you simultaneously pass `replace_query = 1` and `'on_duplicate_clause'`, ClickHouse generates an exception.
- `on_duplicate_clause` — Adds the `ON DUPLICATE KEY on_duplicate_clause` expression to the `INSERT` query.
Example: `INSERT INTO t (c1,c2) VALUES ('a', 2) ON DUPLICATE KEY UPDATE c2 = c2 + 1`, where `on_duplicate_clause` is `UPDATE c2 = c2 + 1`. See MySQL documentation to find which `on_duplicate_clause` you can use with `ON DUPLICATE KEY` clause.
To specify `on_duplicate_clause` you need to pass `0` to the `replace_query` parameter. If you simultaneously pass `replace_query = 1` and `on_duplicate_clause`, ClickHouse generates an exception.
At this time, simple `WHERE` clauses such as ` =, !=, >, >=, <, <=` are executed on the MySQL server.

View File

@ -400,6 +400,7 @@ Setting fields:
- `db` Name of the database.
- `table` Name of the table.
- `where ` The selection criteria. May be omitted.
- `invalidate_query` Query for checking the dictionary status. Optional parameter. Read more in the section [Updating dictionaries](external_dicts_dict_lifetime.md#dicts-external_dicts_dict_lifetime).
<a name="dicts-external_dicts_dict_sources-mongodb"></a>

View File

@ -53,7 +53,7 @@ Combines arrays passed as arguments.
arrayConcat(arrays)
```
**Arguments**
**Parameters**
- `arrays` Arrays of comma-separated `[values]`.
@ -92,6 +92,79 @@ SELECT has([1, 2, NULL], NULL)
└─────────────────────────┘
```
## hasAll
Checks whether one array is a subset of another.
```
hasAll(set, subset)
```
**Parameters**
- `set` Array of any type with a set of elements.
- `subset` Array of any type with elements that should be tested to be a subset of `set`.
**Return values**
- `1`, if `set` contains all of the elements from `subset`.
- `0`, otherwise.
**Peculiar properties**
- An empty array is a subset of any array.
- `Null` processed as a value.
- Order of values in both of arrays doesn't matter.
**Examples**
`SELECT hasAll([], [])` returns 1.
`SELECT hasAll([1, Null], [Null])` returns 1.
`SELECT hasAll([1.0, 2, 3, 4], [1, 3])` returns 1.
`SELECT hasAll(['a', 'b'], ['a'])` returns 1.
`SELECT hasAll([1], ['a'])` returns 0.
`SELECT hasAll([[1, 2], [3, 4]], [[1, 2], [3, 5]])` returns 0.
## hasAny
Checks whether two arrays have intersection by some elements.
```
hasAny(array1, array2)
```
**Parameters**
- `array1` Array of any type with a set of elements.
- `array2` Array of any type with a set of elements.
**Return values**
- `1`, if `array1` and `array2` have one similar element at least.
- `0`, otherwise.
**Peculiar properties**
- `Null` processed as a value.
- Order of values in both of arrays doesn't matter.
**Examples**
`SELECT hasAny([1], [])` returns `0`.
`SELECT hasAny([Null], [Null, 1])` returns `1`.
`SELECT hasAny([-128, 1., 512], [1])` returns `1`.
`SELECT hasAny([[1, 2], [3, 4]], ['a', 'c'])` returns `0`.
`SELECT hasAll([[1, 2], [3, 4]], [[1, 2], [1, 2]])` returns `1`.
## indexOf(arr, x)
Returns the index of the first 'x' element (starting from 1) if it is in the array, or 0 if it is not.
@ -99,7 +172,7 @@ Returns the index of the first 'x' element (starting from 1) if it is in the arr
Example:
```
:) select indexOf([1,3,NULL,NULL],NULL)
:) SELECT indexOf([1,3,NULL,NULL],NULL)
SELECT indexOf([1, 3, NULL, NULL], NULL)
@ -230,7 +303,7 @@ Removes the last item from the array.
arrayPopBack(array)
```
**Arguments**
**Parameters**
- `array` Array.
@ -254,7 +327,7 @@ Removes the first item from the array.
arrayPopFront(array)
```
**Arguments**
**Parameters**
- `array` Array.
@ -278,7 +351,7 @@ Adds one item to the end of the array.
arrayPushBack(array, single_value)
```
**Arguments**
**Parameters**
- `array` Array.
- `single_value` A single value. Only numbers can be added to an array with numbers, and only strings can be added to an array of strings. When adding numbers, ClickHouse automatically sets the `single_value` type for the data type of the array. For more information about the types of data in ClickHouse, see "[Data types](../../data_types/index.md#data_types)". Can be `NULL`. The function adds a `NULL` element to an array, and the type of array elements converts to `Nullable`.
@ -303,7 +376,7 @@ Adds one element to the beginning of the array.
arrayPushFront(array, single_value)
```
**Arguments**
**Parameters**
- `array` Array.
- `single_value` A single value. Only numbers can be added to an array with numbers, and only strings can be added to an array of strings. When adding numbers, ClickHouse automatically sets the `single_value` type for the data type of the array. For more information about the types of data in ClickHouse, see "[Data types](../../data_types/index.md#data_types)". Can be `NULL`. The function adds a `NULL` element to an array, and the type of array elements converts to `Nullable`.
@ -366,7 +439,7 @@ Returns a slice of the array.
arraySlice(array, offset[, length])
```
**Arguments**
**Parameters**
- `array` Array of data.
- `offset` Indent from the edge of the array. A positive value indicates an offset on the left, and a negative value is an indent on the right. Numbering of the array items begins with 1.

View File

@ -60,6 +60,9 @@ Returns the date.
Rounds down a date or date with time to the first day of the month.
Returns the date.
!!! attention
The behavior of parsing incorrect dates is implementation specific. ClickHouse may return zero date, throw an exception or do "natural" overflow.
## toStartOfQuarter
Rounds down a date or date with time to the first day of the quarter.

View File

@ -18,7 +18,11 @@ MySQL('host:port', 'database', 'table', 'user', 'password'[, replace_query, 'on_
- `user` — Пользователь MySQL.
- `password` — Пароль пользователя.
- `replace_query` — Флаг, устанавливающий замену запроса `INSERT INTO` на `REPLACE INTO`. Если `replace_query=1`, то запрос заменяется.
- `'on_duplicate_clause'` — Добавляет выражение `ON DUPLICATE KEY UPDATE 'on_duplicate_clause'` в запрос `INSERT`. Например, `impression = VALUES(impression) + impression`. Чтобы указать `'on_duplicate_clause'` необходимо передать `0` в параметр `replace_query`. Если одновременно передать `replace_query = 1` и `'on_duplicate_clause'`, то ClickHouse сгенерирует исключение.
- `on_duplicate_clause` — Добавляет выражение `ON DUPLICATE KEY 'on_duplicate_clause'` в запрос `INSERT`.
Например, `INSERT INTO t (c1,c2) VALUES ('a', 2) ON DUPLICATE KEY UPDATE c2 = c2 + 1`, где `on_duplicate_clause``UPDATE c2 = c2 + 1`. Какие выражения `on_duplicate_clause` вы можете использовать с `ON DUPLICATE KEY`, смотрите в документации MySQL.
Чтобы указать `on_duplicate_clause` необходимо передать `0` в параметр `replace_query`. Если одновременно передать `replace_query = 1` и `on_duplicate_clause`, то ClickHouse сгенерирует исключение.
На данный момент простые условия `WHERE`, такие как `=, !=, >, >=, <, <=` будут выполняться на стороне сервера MySQL.

View File

@ -51,6 +51,9 @@ SELECT
Округляет дату или дату-с-временем вниз до первого дня месяца.
Возвращается дата.
!!! attention
Возвращаемое значение для некорректных дат зависит от реализации. ClickHouse может вернуть нулевую дату, выбросить исключение, или выполнить "естественное" перетекание дат между месяцами.
## toStartOfQuarter
Округляет дату или дату-с-временем вниз до первого дня квартала.
Первый день квартала - это одно из 1 января, 1 апреля, 1 июля, 1 октября.

View File

@ -147,7 +147,7 @@ const char * __shm_directory(size_t * len)
* OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF OR IN
* CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
*/
void explicit_bzero(void * buf, size_t len)
void __attribute__((__weak__)) explicit_bzero(void * buf, size_t len)
{
memset(buf, 0, len);
__asm__ __volatile__("" :: "r"(buf) : "memory");