This commit is contained in:
Evgeniy Gatov 2016-03-08 20:51:32 +03:00
commit de920939e2
45 changed files with 614 additions and 296 deletions

View File

@ -56,6 +56,7 @@ add_library (dbms
include/DB/Parsers/ASTExpressionList.h
include/DB/Parsers/ASTQueryWithOutput.h
include/DB/Parsers/ParserSelectQuery.h
include/DB/Parsers/ParserTableExpression.h
include/DB/Parsers/ParserUseQuery.h
include/DB/Parsers/ASTShowTablesQuery.h
include/DB/Parsers/ASTFunction.h
@ -726,6 +727,7 @@ add_library (dbms
src/Parsers/ParserQueryWithOutput.cpp
src/Parsers/ParserCreateQuery.cpp
src/Parsers/ParserSelectQuery.cpp
src/Parsers/ParserTableExpression.cpp
src/Parsers/ParserJoin.cpp
src/Parsers/ParserInsertQuery.cpp
src/Parsers/ParserDropQuery.cpp

View File

@ -67,14 +67,19 @@ struct OutputData<StreamUnionMode::ExtraInfo>
template <StreamUnionMode mode = StreamUnionMode::Basic>
class UnionBlockInputStream : public IProfilingBlockInputStream
{
public:
using ExceptionCallback = std::function<void()>;
private:
using Self = UnionBlockInputStream<mode>;
public:
UnionBlockInputStream(BlockInputStreams inputs, BlockInputStreamPtr additional_input_at_end, size_t max_threads) :
UnionBlockInputStream(BlockInputStreams inputs, BlockInputStreamPtr additional_input_at_end, size_t max_threads,
ExceptionCallback exception_callback_ = ExceptionCallback()) :
output_queue(std::min(inputs.size(), max_threads)),
handler(*this),
processor(inputs, additional_input_at_end, max_threads, handler)
processor(inputs, additional_input_at_end, max_threads, handler),
exception_callback(exception_callback_)
{
children = inputs;
if (additional_input_at_end)
@ -203,7 +208,11 @@ protected:
output_queue.pop(received_payload);
if (received_payload.exception)
{
if (exception_callback)
exception_callback();
std::rethrow_exception(received_payload.exception);
}
if (!received_payload.block)
all_read = true;
@ -299,6 +308,8 @@ private:
Handler handler;
ParallelInputsProcessor<Handler, mode> processor;
ExceptionCallback exception_callback;
Payload received_payload;
bool started = false;

View File

@ -1,6 +1,6 @@
#pragma once
#include <google/dense_hash_map>
#include <sparsehash/dense_hash_map>
#include <Poco/File.h>
#include <Poco/NumberParser.h>

View File

@ -121,6 +121,7 @@ private:
size_t local_shard_count = 0;
};
class Clusters
{
public:

View File

@ -271,7 +271,8 @@ private:
* Установить has_aggregation = true, если есть GROUP BY или хотя бы одна агрегатная функция.
*/
void analyzeAggregation();
void getAggregates(ASTPtr ast, ExpressionActionsPtr & actions);
void getAggregates(const ASTPtr & ast, ExpressionActionsPtr & actions);
void assertNoAggregates(const ASTPtr & ast, const char * description);
/** Получить множество нужных столбцов для чтения из таблицы.
* При этом, столбцы, указанные в ignored_names, считаются ненужными. И параметр ignored_names может модифицироваться.

View File

@ -49,7 +49,7 @@ public:
bool distinct = false;
ASTPtr select_expression_list;
ASTPtr database;
ASTPtr table; /// Идентификатор, табличная функция или подзапрос (рекурсивно ASTSelectQuery)
ASTPtr table; /// Имя таблицы, табличная функция или подзапрос (рекурсивно ASTSelectQuery)
bool array_join_is_left = false; /// LEFT ARRAY JOIN
ASTPtr array_join_expression_list; /// ARRAY JOIN
ASTPtr join; /// Обычный (не ARRAY) JOIN.

View File

@ -0,0 +1,20 @@
#pragma once
#include <DB/Parsers/IParserBase.h>
namespace DB
{
/** Имя таблицы (с или без имени БД), табличная функция, подзапрос.
* Без модификаторов FINAL, SAMPLE и т. п.
* Без алиаса.
*/
class ParserTableExpression : public IParserBase
{
protected:
const char * getName() const { return "table or subquery or table function"; }
bool parseImpl(Pos & pos, Pos end, ASTPtr & node, Pos & max_parsed_pos, Expected & expected);
};
}

View File

@ -252,16 +252,18 @@ public:
return NameAndTypePair("_part", new DataTypeString);
if (column_name == "_part_index")
return NameAndTypePair("_part_index", new DataTypeUInt64);
if (column_name == "_sample_factor")
return NameAndTypePair("_sample_factor", new DataTypeFloat64);
return ITableDeclaration::getColumn(column_name);
}
bool hasColumn(const String & column_name) const override
{
if (column_name == "_part")
return true;
if (column_name == "_part_index")
return true;
return ITableDeclaration::hasColumn(column_name);
return ITableDeclaration::hasColumn(column_name)
|| column_name == "_part"
|| column_name == "_part_index"
|| column_name == "_sample_factor";
}
String getFullPath() const { return full_path; }

View File

@ -17,7 +17,6 @@ public:
MergeTreeDataSelectExecutor(MergeTreeData & data_);
/** При чтении, выбирается набор кусков, покрывающий нужный диапазон индекса.
* Если inout_part_index != nullptr, из этого счетчика берутся значения для виртуального столбца _part_index.
* max_block_number_to_read - если не ноль - не читать все куски, у которых правая граница больше этого порога.
*/
BlockInputStreams read(
@ -28,7 +27,7 @@ public:
QueryProcessingStage::Enum & processed_stage,
size_t max_block_size,
unsigned threads,
size_t * inout_part_index,
size_t * inout_part_index, /// Если не nullptr, из этого счетчика берутся значения для виртуального столбца _part_index.
Int64 max_block_number_to_read) const;
private:

View File

@ -318,6 +318,7 @@ private:
const auto rows = block.rowsInFirstColumn();
/// add virtual columns
/// Кроме _sample_factor, который добавляется снаружи.
if (!virt_column_names.empty())
{
for (const auto & virt_column_name : virt_column_names)

View File

@ -155,7 +155,6 @@ private:
zkutil::RWLock createLock();
zkutil::RWLock createCoordinatorLock(const std::string & coordinator_id);
zkutil::SingleBarrier createSubscribeBarrier(const std::string & coordinator_id);
zkutil::SingleBarrier createCheckBarrier(const std::string & coordinator_id);
zkutil::SingleBarrier createOptOutBarrier(const std::string & coordinator_id, size_t count);
zkutil::SingleBarrier createRecoveryBarrier(const ReshardingJob & job);

View File

@ -45,8 +45,8 @@ public:
bool supportsIndexForIn() const override { return true; }
const NamesAndTypesList & getColumnsListImpl() const override { return *columns; }
NameAndTypePair getColumn(const String &column_name) const override;
bool hasColumn(const String &column_name) const override;
NameAndTypePair getColumn(const String & column_name) const override;
bool hasColumn(const String & column_name) const override;
BlockInputStreams read(
const Names & column_names,

View File

@ -72,11 +72,13 @@ private:
context.assertDatabaseExists(source_database);
const Tables & tables = context.getDatabases().at(source_database);
for (Tables::const_iterator it = tables.begin(); it != tables.end(); ++it)
{
if (table_name_regexp.match(it->first))
{
any_table = it->second;
break;
}
}
}
if (!any_table)

View File

@ -337,7 +337,8 @@ namespace ErrorCodes
extern const int RESHARDING_INVALID_QUERY = 331;
extern const int RESHARDING_INITIATOR_CHECK_FAILED = 332;
extern const int RWLOCK_ALREADY_HELD = 333;
extern const int BARRIER_TIMEOUT = 334;
extern const int RWLOCK_NO_SUCH_LOCK = 334;
extern const int BARRIER_TIMEOUT = 335;
extern const int KEEPER_EXCEPTION = 999;
extern const int POCO_EXCEPTION = 1000;

View File

@ -130,6 +130,9 @@ Cluster::Cluster(const Settings & settings, const String & cluster_name)
Poco::Util::AbstractConfiguration::Keys config_keys;
config.keys(cluster_name, config_keys);
if (config_keys.empty())
throw Exception("No cluster elements (shard, node) specified in config at path " + cluster_name, ErrorCodes::SHARD_HAS_NO_CONNECTIONS);
const auto & config_prefix = cluster_name + ".";
UInt32 current_shard_num = 1;
@ -374,7 +377,7 @@ void Cluster::assignName()
SHA512_Init(&ctx);
for (const auto & host : elements)
SHA512_Update(&ctx, reinterpret_cast<const void *>(host.data()), host.size());
SHA512_Update(&ctx, reinterpret_cast<const void *>(host.c_str()), host.size() + 1);
SHA512_Final(hash, &ctx);

View File

@ -68,6 +68,7 @@ namespace ErrorCodes
extern const int PARAMETERS_TO_AGGREGATE_FUNCTIONS_MUST_BE_LITERALS;
extern const int DUPLICATE_COLUMN;
extern const int FUNCTION_CANNOT_HAVE_PARAMETERS;
extern const int ILLEGAL_AGGREGATION;
}
@ -128,7 +129,7 @@ bool functionIsInOrGlobalInOperator(const String & name)
void ExpressionAnalyzer::init()
{
select_query = typeid_cast<ASTSelectQuery *>(&*ast);
select_query = typeid_cast<ASTSelectQuery *>(ast.get());
/// В зависимости от профиля пользователя проверить наличие прав на выполнение
/// распределённых подзапросов внутри секций IN или JOIN и обработать эти подзапросы.
@ -292,18 +293,18 @@ void ExpressionAnalyzer::initGlobalSubqueries(ASTPtr & ast)
/// Рекурсивные вызовы. Не опускаемся в подзапросы.
for (auto & child : ast->children)
if (!typeid_cast<ASTSelectQuery *>(&*child))
if (!typeid_cast<ASTSelectQuery *>(child.get()))
initGlobalSubqueries(child);
/// Действия, выполняемые снизу вверх.
if (ASTFunction * node = typeid_cast<ASTFunction *>(&*ast))
if (ASTFunction * node = typeid_cast<ASTFunction *>(ast.get()))
{
/// Для GLOBAL IN.
if (do_global && (node->name == "globalIn" || node->name == "globalNotIn"))
addExternalStorage(node->arguments->children.at(1));
}
else if (ASTJoin * node = typeid_cast<ASTJoin *>(&*ast))
else if (ASTJoin * node = typeid_cast<ASTJoin *>(ast.get()))
{
/// Для GLOBAL JOIN.
if (do_global && node->locality == ASTJoin::Global)
@ -321,7 +322,7 @@ void ExpressionAnalyzer::findExternalTables(ASTPtr & ast)
/// Если идентификатор типа таблица
StoragePtr external_storage;
if (ASTIdentifier * node = typeid_cast<ASTIdentifier *>(&*ast))
if (ASTIdentifier * node = typeid_cast<ASTIdentifier *>(ast.get()))
if (node->kind == ASTIdentifier::Table)
if ((external_storage = context.tryGetExternalTable(node->name)))
external_tables[node->name] = external_storage;
@ -338,7 +339,7 @@ void ExpressionAnalyzer::addExternalStorage(ASTPtr & subquery_or_table_name)
if (!(storage && storage->isRemote()))
return;
if (const ASTIdentifier * table = typeid_cast<const ASTIdentifier *>(&*subquery_or_table_name))
if (const ASTIdentifier * table = typeid_cast<const ASTIdentifier *>(subquery_or_table_name.get()))
{
/// Если это уже внешняя таблица, ничего заполять не нужно. Просто запоминаем ее наличие.
if (external_tables.end() != external_tables.find(table->name))
@ -449,7 +450,7 @@ NamesAndTypesList::iterator ExpressionAnalyzer::findColumn(const String & name,
/// Например, при ignore_levels=1 ast не может быть занесен в словарь, но его дети могут.
void ExpressionAnalyzer::addASTAliases(ASTPtr & ast, int ignore_levels)
{
ASTSelectQuery * select = typeid_cast<ASTSelectQuery *>(&*ast);
ASTSelectQuery * select = typeid_cast<ASTSelectQuery *>(ast.get());
/// Обход снизу-вверх. Не опускаемся в подзапросы.
for (auto & child : ast->children)
@ -461,7 +462,7 @@ void ExpressionAnalyzer::addASTAliases(ASTPtr & ast, int ignore_levels)
if (select && child == select->array_join_expression_list)
new_ignore_levels = 2;
if (!typeid_cast<ASTSelectQuery *>(&*child))
if (!typeid_cast<ASTSelectQuery *>(child.get()))
addASTAliases(child, new_ignore_levels);
}
@ -481,9 +482,9 @@ void ExpressionAnalyzer::addASTAliases(ASTPtr & ast, int ignore_levels)
StoragePtr ExpressionAnalyzer::getTable()
{
if (const ASTSelectQuery * select = typeid_cast<const ASTSelectQuery *>(&*ast))
if (const ASTSelectQuery * select = typeid_cast<const ASTSelectQuery *>(ast.get()))
{
if (select->table && !typeid_cast<const ASTSelectQuery *>(&*select->table) && !typeid_cast<const ASTFunction *>(&*select->table))
if (select->table && !typeid_cast<const ASTSelectQuery *>(select->table.get()) && !typeid_cast<const ASTFunction *>(select->table.get()))
{
String database = select->database
? typeid_cast<const ASTIdentifier &>(*select->database).name
@ -527,7 +528,7 @@ void ExpressionAnalyzer::normalizeTreeImpl(
/// rewrite правила, которые действуют при обходе сверху-вниз.
bool replaced = false;
ASTFunction * func_node = typeid_cast<ASTFunction *>(&*ast);
ASTFunction * func_node = typeid_cast<ASTFunction *>(ast.get());
if (func_node)
{
/** Нет ли в таблице столбца, название которого полностью совпадает с записью функции?
@ -545,7 +546,7 @@ void ExpressionAnalyzer::normalizeTreeImpl(
/// Может быть указано IN t, где t - таблица, что равносильно IN (SELECT * FROM t).
if (functionIsInOrGlobalInOperator(func_node->name))
if (ASTIdentifier * right = typeid_cast<ASTIdentifier *>(&*func_node->arguments->children.at(1)))
if (ASTIdentifier * right = typeid_cast<ASTIdentifier *>(func_node->arguments->children.at(1).get()))
right->kind = ASTIdentifier::Table;
/// А ещё, в качестве исключения, будем понимать count(*) как count(), а не count(список всех столбцов).
@ -555,7 +556,7 @@ void ExpressionAnalyzer::normalizeTreeImpl(
func_node->arguments->children.clear();
}
}
else if (ASTIdentifier * node = typeid_cast<ASTIdentifier *>(&*ast))
else if (ASTIdentifier * node = typeid_cast<ASTIdentifier *>(ast.get()))
{
if (node->kind == ASTIdentifier::Column)
{
@ -581,13 +582,13 @@ void ExpressionAnalyzer::normalizeTreeImpl(
}
}
}
else if (ASTExpressionList * node = typeid_cast<ASTExpressionList *>(&*ast))
else if (ASTExpressionList * node = typeid_cast<ASTExpressionList *>(ast.get()))
{
/// Заменим * на список столбцов.
ASTs & asts = node->children;
for (int i = static_cast<int>(asts.size()) - 1; i >= 0; --i)
{
if (ASTAsterisk * asterisk = typeid_cast<ASTAsterisk *>(&*asts[i]))
if (ASTAsterisk * asterisk = typeid_cast<ASTAsterisk *>(asts[i].get()))
{
ASTs all_columns;
for (const auto & column_name_type : columns)
@ -598,10 +599,10 @@ void ExpressionAnalyzer::normalizeTreeImpl(
}
}
}
else if (ASTJoin * node = typeid_cast<ASTJoin *>(&*ast))
else if (ASTJoin * node = typeid_cast<ASTJoin *>(ast.get()))
{
/// может быть указано JOIN t, где t - таблица, что равносильно JOIN (SELECT * FROM t).
if (ASTIdentifier * right = typeid_cast<ASTIdentifier *>(&*node->table))
if (ASTIdentifier * right = typeid_cast<ASTIdentifier *>(node->table.get()))
right->kind = ASTIdentifier::Table;
}
@ -626,7 +627,7 @@ void ExpressionAnalyzer::normalizeTreeImpl(
{
auto & child = func_node->arguments->children[i];
if (typeid_cast<ASTSelectQuery *>(&*child))
if (typeid_cast<const ASTSelectQuery *>(child.get()))
continue;
normalizeTreeImpl(child, finished_asts, current_asts, current_alias);
@ -636,7 +637,7 @@ void ExpressionAnalyzer::normalizeTreeImpl(
{
for (auto & child : ast->children)
{
if (typeid_cast<ASTSelectQuery *>(&*child))
if (typeid_cast<const ASTSelectQuery *>(child.get()))
continue;
normalizeTreeImpl(child, finished_asts, current_asts, current_alias);
@ -644,7 +645,7 @@ void ExpressionAnalyzer::normalizeTreeImpl(
}
/// Если секция WHERE или HAVING состоит из одного алиаса, ссылку нужно заменить не только в children, но и в where_expression и having_expression.
if (ASTSelectQuery * select = typeid_cast<ASTSelectQuery *>(&*ast))
if (ASTSelectQuery * select = typeid_cast<ASTSelectQuery *>(ast.get()))
{
if (select->prewhere_expression)
normalizeTreeImpl(select->prewhere_expression, finished_asts, current_asts, current_alias);
@ -656,7 +657,7 @@ void ExpressionAnalyzer::normalizeTreeImpl(
/// Действия, выполняемые снизу вверх.
if (ASTFunction * node = typeid_cast<ASTFunction *>(&*ast))
if (ASTFunction * node = typeid_cast<ASTFunction *>(ast.get()))
{
if (node->kind == ASTFunction::TABLE_FUNCTION)
{
@ -995,7 +996,7 @@ void ExpressionAnalyzer::makeSetsForIndexImpl(ASTPtr & node, const Block & sampl
IAST & args = *func->arguments;
ASTPtr & arg = args.children.at(1);
if (!typeid_cast<ASTSet *>(&*arg) && !typeid_cast<ASTSubquery *>(&*arg) && !typeid_cast<ASTIdentifier *>(&*arg))
if (!typeid_cast<ASTSet *>(arg.get()) && !typeid_cast<ASTSubquery *>(arg.get()) && !typeid_cast<ASTIdentifier *>(arg.get()))
{
try
{
@ -1016,8 +1017,8 @@ static SharedPtr<InterpreterSelectQuery> interpretSubquery(
ASTPtr & subquery_or_table_name, const Context & context, size_t subquery_depth, const Names & required_columns)
{
/// Подзапрос или имя таблицы. Имя таблицы аналогично подзапросу SELECT * FROM t.
const ASTSubquery * subquery = typeid_cast<const ASTSubquery *>(&*subquery_or_table_name);
const ASTIdentifier * table = typeid_cast<const ASTIdentifier *>(&*subquery_or_table_name);
const ASTSubquery * subquery = typeid_cast<const ASTSubquery *>(subquery_or_table_name.get());
const ASTIdentifier * table = typeid_cast<const ASTIdentifier *>(subquery_or_table_name.get());
if (!subquery && !table)
throw Exception("IN/JOIN supports only SELECT subqueries.", ErrorCodes::BAD_ARGUMENTS);
@ -1119,12 +1120,12 @@ void ExpressionAnalyzer::makeSet(ASTFunction * node, const Block & sample_block)
ASTPtr & arg = args.children.at(1);
/// Уже преобразовали.
if (typeid_cast<ASTSet *>(&*arg))
if (typeid_cast<ASTSet *>(arg.get()))
return;
/// Если подзапрос или имя таблицы для SELECT.
ASTIdentifier * identifier = typeid_cast<ASTIdentifier *>(&*arg);
if (typeid_cast<ASTSubquery *>(&*arg) || identifier)
ASTIdentifier * identifier = typeid_cast<ASTIdentifier *>(arg.get());
if (typeid_cast<ASTSubquery *>(arg.get()) || identifier)
{
/// Получаем поток блоков для подзапроса. Создаём Set и кладём на место подзапроса.
String set_id = arg->getColumnName();
@ -1225,7 +1226,7 @@ void ExpressionAnalyzer::makeExplicitSet(ASTFunction * node, const Block & sampl
DataTypes set_element_types;
ASTPtr & left_arg = args.children.at(0);
ASTFunction * left_arg_tuple = typeid_cast<ASTFunction *>(&*left_arg);
ASTFunction * left_arg_tuple = typeid_cast<ASTFunction *>(left_arg.get());
if (left_arg_tuple && left_arg_tuple->name == "tuple")
{
@ -1243,7 +1244,7 @@ void ExpressionAnalyzer::makeExplicitSet(ASTFunction * node, const Block & sampl
else
{
DataTypePtr left_type = sample_block.getByName(left_arg->getColumnName()).type;
if (DataTypeArray * array_type = typeid_cast<DataTypeArray *>(&*left_type))
if (DataTypeArray * array_type = typeid_cast<DataTypeArray *>(left_type.get()))
set_element_types.push_back(array_type->getNestedType());
else
set_element_types.push_back(left_type);
@ -1253,7 +1254,7 @@ void ExpressionAnalyzer::makeExplicitSet(ASTFunction * node, const Block & sampl
bool single_value = false;
ASTPtr elements_ast = arg;
if (ASTFunction * set_func = typeid_cast<ASTFunction *>(&*arg))
if (ASTFunction * set_func = typeid_cast<ASTFunction *>(arg.get()))
{
if (set_func->name == "tuple")
{
@ -1264,8 +1265,8 @@ void ExpressionAnalyzer::makeExplicitSet(ASTFunction * node, const Block & sampl
}
else
{
/// Отличм случай (x, y) in ((1, 2), (3, 4)) от случая (x, y) in (1, 2).
ASTFunction * any_element = typeid_cast<ASTFunction *>(&*set_func->arguments->children.at(0));
/// Отличим случай (x, y) in ((1, 2), (3, 4)) от случая (x, y) in (1, 2).
ASTFunction * any_element = typeid_cast<ASTFunction *>(set_func->arguments->children.at(0).get());
if (set_element_types.size() >= 2 && (!any_element || any_element->name != "tuple"))
single_value = true;
else
@ -1282,7 +1283,7 @@ void ExpressionAnalyzer::makeExplicitSet(ASTFunction * node, const Block & sampl
single_value = true;
}
}
else if (typeid_cast<ASTLiteral *>(&*arg))
else if (typeid_cast<ASTLiteral *>(arg.get()))
{
single_value = true;
}
@ -1436,7 +1437,7 @@ void ExpressionAnalyzer::getArrayJoinedColumns()
const String nested_table_name = ast->getColumnName();
const String nested_table_alias = ast->getAliasOrColumnName();
if (nested_table_alias == nested_table_name && !typeid_cast<const ASTIdentifier *>(&*ast))
if (nested_table_alias == nested_table_name && !typeid_cast<const ASTIdentifier *>(ast.get()))
throw Exception("No alias for non-trivial value in ARRAY JOIN: " + nested_table_name, ErrorCodes::ALIAS_REQUIRED);
if (array_join_alias_to_name.count(nested_table_alias) || aliases.count(nested_table_alias))
@ -1466,7 +1467,7 @@ void ExpressionAnalyzer::getArrayJoinedColumns()
String result_name = expr->getAliasOrColumnName();
/// Это массив.
if (!typeid_cast<ASTIdentifier *>(&*expr) || findColumn(source_name, columns) != columns.end())
if (!typeid_cast<ASTIdentifier *>(expr.get()) || findColumn(source_name, columns) != columns.end())
{
array_join_result_to_source[result_name] = source_name;
}
@ -1495,7 +1496,7 @@ void ExpressionAnalyzer::getArrayJoinedColumns()
/// Заполняет array_join_result_to_source: по каким столбцам-массивам размножить, и как их после этого назвать.
void ExpressionAnalyzer::getArrayJoinedColumnsImpl(ASTPtr ast)
{
if (ASTIdentifier * node = typeid_cast<ASTIdentifier *>(&*ast))
if (ASTIdentifier * node = typeid_cast<ASTIdentifier *>(ast.get()))
{
if (node->kind == ASTIdentifier::Column)
{
@ -1528,7 +1529,7 @@ void ExpressionAnalyzer::getArrayJoinedColumnsImpl(ASTPtr ast)
else
{
for (auto & child : ast->children)
if (!typeid_cast<const ASTSelectQuery *>(&*child))
if (!typeid_cast<const ASTSelectQuery *>(child.get()))
getArrayJoinedColumnsImpl(child);
}
}
@ -1537,11 +1538,11 @@ void ExpressionAnalyzer::getArrayJoinedColumnsImpl(ASTPtr ast)
void ExpressionAnalyzer::getActionsImpl(ASTPtr ast, bool no_subqueries, bool only_consts, ScopeStack & actions_stack)
{
/// Если результат вычисления уже есть в блоке.
if ((typeid_cast<ASTFunction *>(&*ast) || typeid_cast<ASTLiteral *>(&*ast))
if ((typeid_cast<ASTFunction *>(ast.get()) || typeid_cast<ASTLiteral *>(ast.get()))
&& actions_stack.getSampleBlock().has(ast->getColumnName()))
return;
if (ASTIdentifier * node = typeid_cast<ASTIdentifier *>(&*ast))
if (ASTIdentifier * node = typeid_cast<ASTIdentifier *>(ast.get()))
{
std::string name = node->getColumnName();
if (!only_consts && !actions_stack.getSampleBlock().has(name))
@ -1559,10 +1560,10 @@ void ExpressionAnalyzer::getActionsImpl(ASTPtr ast, bool no_subqueries, bool onl
ErrorCodes::NOT_AN_AGGREGATE);
}
}
else if (ASTFunction * node = typeid_cast<ASTFunction *>(&*ast))
else if (ASTFunction * node = typeid_cast<ASTFunction *>(ast.get()))
{
if (node->kind == ASTFunction::LAMBDA_EXPRESSION)
throw Exception("Unexpected expression", ErrorCodes::UNEXPECTED_EXPRESSION);
throw Exception("Unexpected lambda expression", ErrorCodes::UNEXPECTED_EXPRESSION);
/// Функция arrayJoin.
if (node->kind == ASTFunction::ARRAY_JOIN)
@ -1623,15 +1624,15 @@ void ExpressionAnalyzer::getActionsImpl(ASTPtr ast, bool no_subqueries, bool onl
for (auto & child : node->arguments->children)
{
ASTFunction * lambda = typeid_cast<ASTFunction *>(&*child);
ASTSet * set = typeid_cast<ASTSet *>(&*child);
ASTFunction * lambda = typeid_cast<ASTFunction *>(child.get());
ASTSet * set = typeid_cast<ASTSet *>(child.get());
if (lambda && lambda->name == "lambda")
{
/// Если аргумент - лямбда-выражение, только запомним его примерный тип.
if (lambda->arguments->children.size() != 2)
throw Exception("lambda requires two arguments", ErrorCodes::NUMBER_OF_ARGUMENTS_DOESNT_MATCH);
ASTFunction * lambda_args_tuple = typeid_cast<ASTFunction *>(&*lambda->arguments->children.at(0));
ASTFunction * lambda_args_tuple = typeid_cast<ASTFunction *>(lambda->arguments->children.at(0).get());
if (!lambda_args_tuple || lambda_args_tuple->name != "tuple")
throw Exception("First argument of lambda must be a tuple", ErrorCodes::TYPE_MISMATCH);
@ -1701,17 +1702,17 @@ void ExpressionAnalyzer::getActionsImpl(ASTPtr ast, bool no_subqueries, bool onl
{
ASTPtr child = node->arguments->children[i];
ASTFunction * lambda = typeid_cast<ASTFunction *>(&*child);
ASTFunction * lambda = typeid_cast<ASTFunction *>(child.get());
if (lambda && lambda->name == "lambda")
{
DataTypeExpression * lambda_type = typeid_cast<DataTypeExpression *>(&*argument_types[i]);
ASTFunction * lambda_args_tuple = typeid_cast<ASTFunction *>(&*lambda->arguments->children.at(0));
DataTypeExpression * lambda_type = typeid_cast<DataTypeExpression *>(argument_types[i].get());
ASTFunction * lambda_args_tuple = typeid_cast<ASTFunction *>(lambda->arguments->children.at(0).get());
ASTs lambda_arg_asts = lambda_args_tuple->arguments->children;
NamesAndTypesList lambda_arguments;
for (size_t j = 0; j < lambda_arg_asts.size(); ++j)
{
ASTIdentifier * identifier = typeid_cast<ASTIdentifier *>(&*lambda_arg_asts[j]);
ASTIdentifier * identifier = typeid_cast<ASTIdentifier *>(lambda_arg_asts[j].get());
if (!identifier)
throw Exception("lambda argument declarations must be identifiers", ErrorCodes::TYPE_MISMATCH);
@ -1764,7 +1765,7 @@ void ExpressionAnalyzer::getActionsImpl(ASTPtr ast, bool no_subqueries, bool onl
additional_requirements);
}
}
else if (ASTLiteral * node = typeid_cast<ASTLiteral *>(&*ast))
else if (ASTLiteral * node = typeid_cast<ASTLiteral *>(ast.get()))
{
DataTypePtr type = apply_visitor(FieldToDataType(), node->value);
@ -1783,9 +1784,23 @@ void ExpressionAnalyzer::getActionsImpl(ASTPtr ast, bool no_subqueries, bool onl
}
void ExpressionAnalyzer::getAggregates(ASTPtr ast, ExpressionActionsPtr & actions)
void ExpressionAnalyzer::getAggregates(const ASTPtr & ast, ExpressionActionsPtr & actions)
{
ASTFunction * node = typeid_cast<ASTFunction *>(&*ast);
/// Внутри WHERE и PREWHERE не может быть агрегатных функций.
if (select_query && (ast.get() == select_query->where_expression.get() || ast.get() == select_query->prewhere_expression.get()))
{
assertNoAggregates(ast, "in WHERE or PREWHERE");
return;
}
/// Если мы анализируем не запрос SELECT, а отдельное выражение, то в нём не может быть агрегатных функций.
if (!select_query)
{
assertNoAggregates(ast, "in wrong place");
return;
}
const ASTFunction * node = typeid_cast<const ASTFunction *>(ast.get());
if (node && node->kind == ASTFunction::AGGREGATE_FUNCTION)
{
has_aggregation = true;
@ -1797,12 +1812,15 @@ void ExpressionAnalyzer::getAggregates(ASTPtr ast, ExpressionActionsPtr & action
if (aggregate_descriptions[i].column_name == aggregate.column_name)
return;
ASTs & arguments = node->arguments->children;
const ASTs & arguments = node->arguments->children;
aggregate.argument_names.resize(arguments.size());
DataTypes types(arguments.size());
for (size_t i = 0; i < arguments.size(); ++i)
{
/// Внутри агрегатных функций не может быть других агрегатных функций.
assertNoAggregates(arguments[i], "inside another aggregate function");
getRootActions(arguments[i], true, false, actions);
const std::string & name = arguments[i]->getColumnName();
types[i] = actions->getSampleBlock().getByName(name).type;
@ -1813,14 +1831,15 @@ void ExpressionAnalyzer::getAggregates(ASTPtr ast, ExpressionActionsPtr & action
if (node->parameters)
{
ASTs & parameters = typeid_cast<ASTExpressionList &>(*node->parameters).children;
const ASTs & parameters = typeid_cast<const ASTExpressionList &>(*node->parameters).children;
Array params_row(parameters.size());
for (size_t i = 0; i < parameters.size(); ++i)
{
ASTLiteral * lit = typeid_cast<ASTLiteral *>(&*parameters[i]);
const ASTLiteral * lit = typeid_cast<const ASTLiteral *>(parameters[i].get());
if (!lit)
throw Exception("Parameters to aggregate functions must be literals", ErrorCodes::PARAMETERS_TO_AGGREGATE_FUNCTIONS_MUST_BE_LITERALS);
throw Exception("Parameters to aggregate functions must be literals",
ErrorCodes::PARAMETERS_TO_AGGREGATE_FUNCTIONS_MUST_BE_LITERALS);
params_row[i] = lit->value;
}
@ -1835,15 +1854,27 @@ void ExpressionAnalyzer::getAggregates(ASTPtr ast, ExpressionActionsPtr & action
}
else
{
for (size_t i = 0; i < ast->children.size(); ++i)
{
ASTPtr child = ast->children[i];
if (!typeid_cast<ASTSubquery *>(&*child) && !typeid_cast<ASTSelectQuery *>(&*child))
for (const auto & child : ast->children)
if (!typeid_cast<const ASTSubquery *>(child.get()) && !typeid_cast<const ASTSelectQuery *>(child.get()))
getAggregates(child, actions);
}
}
}
void ExpressionAnalyzer::assertNoAggregates(const ASTPtr & ast, const char * description)
{
const ASTFunction * node = typeid_cast<const ASTFunction *>(ast.get());
if (node && node->kind == ASTFunction::AGGREGATE_FUNCTION)
throw Exception("Aggregate function " + node->getColumnName()
+ " is found " + String(description) + " in query", ErrorCodes::ILLEGAL_AGGREGATION);
for (const auto & child : ast->children)
if (!typeid_cast<const ASTSubquery *>(child.get()) && !typeid_cast<const ASTSelectQuery *>(child.get()))
assertNoAggregates(child, description);
}
void ExpressionAnalyzer::assertSelect() const
{
if (!select_query)
@ -1930,7 +1961,7 @@ bool ExpressionAnalyzer::appendJoin(ExpressionActionsChain & chain, bool only_ty
/// Особый случай - если справа JOIN указано имя таблицы, при чём, таблица имеет тип Join (заранее подготовленное отображение).
/// TODO В этом синтаксисе не поддерживается указание имени БД.
ASTIdentifier * identifier = typeid_cast<ASTIdentifier *>(&*ast_join.table);
ASTIdentifier * identifier = typeid_cast<ASTIdentifier *>(ast_join.table.get());
if (identifier)
{
StoragePtr table = context.tryGetTable("", identifier->name);
@ -2087,7 +2118,7 @@ bool ExpressionAnalyzer::appendOrderBy(ExpressionActionsChain & chain, bool only
ASTs asts = select_query->order_expression_list->children;
for (size_t i = 0; i < asts.size(); ++i)
{
ASTOrderByElement * ast = typeid_cast<ASTOrderByElement *>(&*asts[i]);
ASTOrderByElement * ast = typeid_cast<ASTOrderByElement *>(asts[i].get());
if (!ast || ast->children.size() != 1)
throw Exception("Bad order expression AST", ErrorCodes::UNKNOWN_TYPE_OF_AST_NODE);
ASTPtr order_expression = ast->children.at(0);
@ -2138,7 +2169,7 @@ Block ExpressionAnalyzer::getSelectSampleBlock()
void ExpressionAnalyzer::getActionsBeforeAggregation(ASTPtr ast, ExpressionActionsPtr & actions, bool no_subqueries)
{
ASTFunction * node = typeid_cast<ASTFunction *>(&*ast);
ASTFunction * node = typeid_cast<ASTFunction *>(ast.get());
if (node && node->kind == ASTFunction::AGGREGATE_FUNCTION)
for (auto & argument : node->arguments->children)
@ -2157,7 +2188,7 @@ ExpressionActionsPtr ExpressionAnalyzer::getActions(bool project_result)
ASTs asts;
if (auto node = typeid_cast<const ASTExpressionList *>(&*ast))
if (auto node = typeid_cast<const ASTExpressionList *>(ast.get()))
asts = node->children;
else
asts = ASTs(1, ast);
@ -2226,7 +2257,7 @@ void ExpressionAnalyzer::collectUsedColumns()
{
/// Игнорируем идентификаторы верхнего уровня из секции ARRAY JOIN.
/// Их потом добавим отдельно.
if (typeid_cast<ASTIdentifier *>(&*expressions[i]))
if (typeid_cast<ASTIdentifier *>(expressions[i].get()))
{
ignored.insert(expressions[i]->getColumnName());
}
@ -2389,7 +2420,7 @@ void ExpressionAnalyzer::getRequiredColumnsImpl(ASTPtr ast,
* - идентификаторы, доступные из JOIN-а, кладём в required_joined_columns.
*/
if (ASTIdentifier * node = typeid_cast<ASTIdentifier *>(&*ast))
if (ASTIdentifier * node = typeid_cast<ASTIdentifier *>(ast.get()))
{
if (node->kind == ASTIdentifier::Column
&& !ignored_names.count(node->name)
@ -2404,14 +2435,14 @@ void ExpressionAnalyzer::getRequiredColumnsImpl(ASTPtr ast,
return;
}
if (ASTFunction * node = typeid_cast<ASTFunction *>(&*ast))
if (ASTFunction * node = typeid_cast<ASTFunction *>(ast.get()))
{
if (node->kind == ASTFunction::LAMBDA_EXPRESSION)
{
if (node->arguments->children.size() != 2)
throw Exception("lambda requires two arguments", ErrorCodes::NUMBER_OF_ARGUMENTS_DOESNT_MATCH);
ASTFunction * lambda_args_tuple = typeid_cast<ASTFunction *>(&*node->arguments->children.at(0));
ASTFunction * lambda_args_tuple = typeid_cast<ASTFunction *>(node->arguments->children.at(0).get());
if (!lambda_args_tuple || lambda_args_tuple->name != "tuple")
throw Exception("First argument of lambda must be a tuple", ErrorCodes::TYPE_MISMATCH);
@ -2420,7 +2451,7 @@ void ExpressionAnalyzer::getRequiredColumnsImpl(ASTPtr ast,
Names added_ignored;
for (auto & child : lambda_args_tuple->arguments->children)
{
ASTIdentifier * identifier = typeid_cast<ASTIdentifier *>(&*child);
ASTIdentifier * identifier = typeid_cast<ASTIdentifier *>(child.get());
if (!identifier)
throw Exception("lambda argument declarations must be identifiers", ErrorCodes::TYPE_MISMATCH);
@ -2443,7 +2474,7 @@ void ExpressionAnalyzer::getRequiredColumnsImpl(ASTPtr ast,
}
}
ASTSelectQuery * select = typeid_cast<ASTSelectQuery *>(&*ast);
ASTSelectQuery * select = typeid_cast<ASTSelectQuery *>(ast.get());
/// Рекурсивный обход выражения.
for (auto & child : ast->children)
@ -2451,7 +2482,7 @@ void ExpressionAnalyzer::getRequiredColumnsImpl(ASTPtr ast,
/** Не пойдем в секцию ARRAY JOIN, потому что там нужно смотреть на имена не-ARRAY-JOIN-енных столбцов.
* Туда collectUsedColumns отправит нас отдельно.
*/
if (!typeid_cast<ASTSubquery *>(&*child) && !typeid_cast<ASTSelectQuery *>(&*child) &&
if (!typeid_cast<ASTSubquery *>(child.get()) && !typeid_cast<ASTSelectQuery *>(child.get()) &&
!(select && child == select->array_join_expression_list))
getRequiredColumnsImpl(child, required_columns, ignored_names, available_joined_columns, required_joined_columns);
}

View File

@ -163,11 +163,12 @@ BlockIO InterpreterCreateQuery::executeImpl(bool assume_metadata_exists)
create.children.push_back(new_columns);
create.columns = new_columns;
auto ast_element_for_engine = [](const char * engine)
auto set_engine = [&](const char * engine)
{
storage_name = engine;
ASTFunction * func = new ASTFunction();
func->name = engine;
return func;
create.storage = func;
};
/// Выбор нужного движка таблицы
@ -182,11 +183,11 @@ BlockIO InterpreterCreateQuery::executeImpl(bool assume_metadata_exists)
create.storage = typeid_cast<const ASTCreateQuery &>(*context.getCreateQuery(as_database_name, as_table_name)).storage;
}
else if (create.is_temporary)
create.storage = ast_element_for_engine("Memory");
set_engine("Memory");
else if (create.is_view)
create.storage = ast_element_for_engine("View");
set_engine("View");
else if (create.is_materialized_view)
create.storage = ast_element_for_engine("MaterializedView");
set_engine("MaterializedView");
else
throw Exception("Incorrect CREATE query: required ENGINE.", ErrorCodes::ENGINE_REQUIRED);

View File

@ -1,3 +1,4 @@
#include <memory>
#include <DB/Parsers/ASTSelectQuery.h>
#include <DB/Parsers/ASTIdentifier.h>
#include <DB/Parsers/IParserBase.h>
@ -8,10 +9,16 @@
#include <DB/Parsers/ParserSetQuery.h>
#include <DB/Parsers/ParserSampleRatio.h>
#include <DB/Parsers/ParserSelectQuery.h>
#include <DB/Parsers/ParserTableExpression.h>
namespace DB
{
namespace ErrorCodes
{
extern const int SYNTAX_ERROR;
}
bool ParserSelectQuery::parseImpl(Pos & pos, Pos end, ASTPtr & node, Pos & max_parsed_pos, Expected & expected)
{
@ -75,65 +82,27 @@ bool ParserSelectQuery::parseImpl(Pos & pos, Pos end, ASTPtr & node, Pos & max_p
{
ws.ignore(pos, end);
ParserString s_lparen("(");
ParserString s_rparen(")");
ParserString s_dot(".");
ParserIdentifier ident;
ParserFunction table_function;
Pos before = pos;
ParserWithOptionalAlias table_p(std::make_unique<ParserTableExpression>(), true);
if (!table_p.parse(pos, end, select_query->table, max_parsed_pos, expected))
return false;
if (s_lparen.ignore(pos, end, max_parsed_pos, expected))
/// Раскрываем составной идентификатор в имя БД и имя таблицы. NOTE Можно избавиться от этого в будущем.
if (const ASTIdentifier * table_identifier = typeid_cast<const ASTIdentifier *>(select_query->table.get()))
{
ws.ignore(pos, end);
if (table_identifier->children.size() > 2)
throw Exception("Too many components to table. Table may be specified either in database.table or in table form",
ErrorCodes::SYNTAX_ERROR);
ParserSelectQuery select_p;
if (!select_p.parse(pos, end, select_query->table, max_parsed_pos, expected))
return false;
ws.ignore(pos, end);
if (!s_rparen.ignore(pos, end, max_parsed_pos, expected))
return false;
ws.ignore(pos, end);
}
else if (ident.parse(pos, end, select_query->table, max_parsed_pos, expected))
{
/// Если сразу после identifier идет скобка, значит это должна быть табличная функция
if (s_lparen.ignore(pos, end, max_parsed_pos, expected))
if (table_identifier->children.size() == 2)
{
pos = before;
if (!table_function.parse(pos, end, select_query->table, max_parsed_pos, expected))
return false;
if (select_query->table)
typeid_cast<ASTFunction &>(*select_query->table).kind = ASTFunction::TABLE_FUNCTION;
ws.ignore(pos, end);
}
else
{
ws.ignore(pos, end);
if (s_dot.ignore(pos, end, max_parsed_pos, expected))
{
select_query->database = select_query->table;
select_query->database = table_identifier->children.at(0);
typeid_cast<ASTIdentifier &>(*select_query->database).kind = ASTIdentifier::Database;
ws.ignore(pos, end);
if (!ident.parse(pos, end, select_query->table, max_parsed_pos, expected))
return false;
ws.ignore(pos, end);
}
if (select_query->database)
typeid_cast<ASTIdentifier &>(*select_query->database).kind = ASTIdentifier::Database;
select_query->table = table_identifier->children.at(1);
typeid_cast<ASTIdentifier &>(*select_query->table).kind = ASTIdentifier::Table;
}
}
else
return false;
/// Может быть указан алиас. На данный момент, он ничего не значит и не используется.
ParserAlias(true).ignore(pos, end);
ws.ignore(pos, end);
}

View File

@ -0,0 +1,63 @@
#include <DB/Parsers/CommonParsers.h>
#include <DB/Parsers/ExpressionElementParsers.h>
#include <DB/Parsers/ExpressionListParsers.h>
#include <DB/Parsers/ASTFunction.h>
#include <DB/Parsers/ASTIdentifier.h>
#include <DB/Parsers/ParserSelectQuery.h>
#include <DB/Parsers/ParserTableExpression.h>
namespace DB
{
bool ParserTableExpression::parseImpl(Pos & pos, Pos end, ASTPtr & node, Pos & max_parsed_pos, Expected & expected)
{
ParserWhiteSpaceOrComments ws;
ParserString s_lparen("(");
ParserString s_rparen(")");
ParserCompoundIdentifier ident;
ParserFunction table_function;
Pos before = pos;
if (s_lparen.ignore(pos, end, max_parsed_pos, expected))
{
/// Подзапрос.
ws.ignore(pos, end);
ParserSelectQuery select_p;
if (!select_p.parse(pos, end, node, max_parsed_pos, expected))
return false;
ws.ignore(pos, end);
if (!s_rparen.ignore(pos, end, max_parsed_pos, expected))
return false;
ws.ignore(pos, end);
}
else if (ident.parse(pos, end, node, max_parsed_pos, expected))
{
/// Если сразу после identifier идет скобка, значит это должна быть табличная функция
if (s_lparen.ignore(pos, end, max_parsed_pos, expected))
{
pos = before;
if (!table_function.parse(pos, end, node, max_parsed_pos, expected))
return false;
if (node)
typeid_cast<ASTFunction &>(*node).kind = ASTFunction::TABLE_FUNCTION;
ws.ignore(pos, end);
}
else
{
ws.ignore(pos, end);
typeid_cast<ASTIdentifier &>(*node).kind = ASTIdentifier::Table;
}
}
else
return false;
return true;
}
}

View File

@ -1,6 +1,6 @@
/var/log/clickhouse-server/clickhouse-server*.log {
rotate 20
size 20M
rotate 100
size 100M
compress
missingok
sharedscripts

View File

@ -1,62 +0,0 @@
ATTACH TABLE hits
(
WatchID UInt64
, JavaEnable UInt8
, Title String
, GoodEvent UInt32
, EventTime DateTime
, CounterID UInt32
, ClientIP UInt32
, RegionID UInt32
, UniqID UInt64
, CounterClass UInt8
, OS UInt8
, UserAgent UInt8
, URL String
, Referer String
, Refresh UInt8
, ResolutionWidth UInt16
, ResolutionHeight UInt16
, ResolutionDepth UInt8
, FlashMajor UInt8
, FlashMinor UInt8
, FlashMinor2 String
, NetMajor UInt8
, NetMinor UInt8
, UserAgentMajor UInt16
, UserAgentMinor FixedString(2)
, CookieEnable UInt8
, JavascriptEnable UInt8
, IsMobile UInt8
, IsTablet UInt8
, MobilePhone UInt8
, MobilePhoneModel String
, Params String
, IPNetworkID UInt32
, TraficSourceID Int8
, SearchEngineID UInt16
, SearchPhrase String
, AdvEngineID UInt8
, IsArtifical UInt8
, WindowClientWidth UInt16
, WindowClientHeight UInt16
, ClientTimeZone Int16
, ClientEventTime DateTime
, SilverlightVersion1 UInt8
, SilverlightVersion2 UInt8
, SilverlightVersion3 UInt32
, SilverlightVersion4 UInt16
, PageCharset String
, CodeVersion UInt32
, IsLink UInt8
, IsDownload UInt8
, IsNotBounce UInt8
, FUniqID UInt64
, OriginalURL String
, HID UInt32
, IsOldCounter UInt8
, IsEvent UInt8
, IsParameter UInt8
, DontCountHits UInt8
, WithHash UInt8
) ENGINE = Log

View File

@ -26,6 +26,7 @@ namespace DB
namespace ErrorCodes
{
extern const int INDEX_NOT_USED;
extern const int SAMPLING_NOT_SUPPORTED;
}
@ -36,7 +37,7 @@ MergeTreeDataSelectExecutor::MergeTreeDataSelectExecutor(MergeTreeData & data_)
/// Построить блок состоящий только из возможных значений виртуальных столбцов
static Block getBlockWithVirtualColumns(const MergeTreeData::DataPartsVector & parts)
static Block getBlockWithPartColumn(const MergeTreeData::DataPartsVector & parts)
{
Block res;
ColumnWithTypeAndName _part(new ColumnString, new DataTypeString, "_part");
@ -105,41 +106,69 @@ BlockInputStreams MergeTreeDataSelectExecutor::read(
QueryProcessingStage::Enum & processed_stage,
const size_t max_block_size,
const unsigned threads,
size_t * part_index,
size_t * inout_part_index,
Int64 max_block_number_to_read) const
{
size_t part_index_var = 0;
if (!part_index)
part_index = &part_index_var;
if (!inout_part_index)
inout_part_index = &part_index_var;
MergeTreeData::DataPartsVector parts = data.getDataPartsVector();
/// Если в запросе есть ограничения на виртуальный столбец _part, выберем только подходящие под него куски.
Names virt_column_names, real_column_names;
/// Если в запросе есть ограничения на виртуальный столбец _part или _part_index, выберем только подходящие под него куски.
/// В запросе может быть запрошен виртуальный столбец _sample_factor - 1 / использованный коэффициент сэмплирования.
Names virt_column_names;
Names real_column_names;
bool part_column_queried = false;
bool sample_factor_column_queried = false;
Float64 used_sample_factor = 1;
for (const String & name : column_names_to_return)
if (name != "_part" &&
name != "_part_index")
real_column_names.push_back(name);
else
{
if (name == "_part")
{
part_column_queried = true;
virt_column_names.push_back(name);
}
else if (name == "_part_index")
{
virt_column_names.push_back(name);
}
else if (name == "_sample_factor")
{
sample_factor_column_queried = true;
virt_column_names.push_back(name);
}
else
{
real_column_names.push_back(name);
}
}
NamesAndTypesList available_real_columns = data.getColumnsList();
NamesAndTypesList available_real_and_virtual_columns = available_real_columns;
for (const auto & name : virt_column_names)
available_real_and_virtual_columns.emplace_back(data.getColumn(name));
/// Если в запросе только виртуальные столбцы, надо запросить хотя бы один любой другой.
if (real_column_names.empty())
real_column_names.push_back(ExpressionActions::getSmallestColumn(data.getColumnsList()));
real_column_names.push_back(ExpressionActions::getSmallestColumn(available_real_columns));
Block virtual_columns_block = getBlockWithVirtualColumns(parts);
/// Если запрошен хотя бы один виртуальный столбец, пробуем индексировать
if (!virt_column_names.empty())
/// Если запрошен виртуальный столбец _part, пробуем использовать его в качестве индекса.
Block virtual_columns_block = getBlockWithPartColumn(parts);
if (part_column_queried)
VirtualColumnUtils::filterBlockWithQuery(query, virtual_columns_block, context);
std::multiset<String> values = VirtualColumnUtils::extractSingleValueFromBlock<String>(virtual_columns_block, "_part");
std::multiset<String> part_values = VirtualColumnUtils::extractSingleValueFromBlock<String>(virtual_columns_block, "_part");
data.check(real_column_names);
processed_stage = QueryProcessingStage::FetchColumns;
PKCondition key_condition(query, context, data.getColumnsList(), data.getSortDescription());
PKCondition date_condition(query, context, data.getColumnsList(), SortDescription(1, SortColumnDescription(data.date_column_name, 1)));
PKCondition key_condition(query, context, available_real_and_virtual_columns, data.getSortDescription());
PKCondition date_condition(query, context, available_real_and_virtual_columns, SortDescription(1, SortColumnDescription(data.date_column_name, 1)));
if (settings.force_primary_key && key_condition.alwaysUnknownOrTrue())
throw Exception("Primary key is not used and setting 'force_primary_key' is set.", ErrorCodes::INDEX_NOT_USED);
@ -157,7 +186,7 @@ BlockInputStreams MergeTreeDataSelectExecutor::read(
for (const auto & part : prev_parts)
{
if (values.find(part->name) == values.end())
if (part_values.find(part->name) == part_values.end())
continue;
Field left = static_cast<UInt64>(part->left_date);
@ -265,6 +294,12 @@ BlockInputStreams MergeTreeDataSelectExecutor::read(
if (use_sampling)
{
if (!data.sampling_expression)
throw Exception("Illegal SAMPLE: table doesn't support sampling", ErrorCodes::SAMPLING_NOT_SUPPORTED);
if (sample_factor_column_queried && relative_sample_size != 0)
used_sample_factor = 1.0 / boost::rational_cast<Float64>(relative_sample_size);
RelativeSize size_of_universum = 0;
DataTypePtr type = data.getPrimaryExpression()->getSampleBlock().getByName(data.sampling_expression->getColumnName()).type;
@ -277,7 +312,8 @@ BlockInputStreams MergeTreeDataSelectExecutor::read(
else if (typeid_cast<const DataTypeUInt8 *>(type.get()))
size_of_universum = RelativeSize(std::numeric_limits<UInt8>::max()) + 1;
else
throw Exception("Invalid sampling column type in storage parameters: " + type->getName() + ". Must be unsigned integer type.", ErrorCodes::ILLEGAL_TYPE_OF_COLUMN_FOR_FILTER);
throw Exception("Invalid sampling column type in storage parameters: " + type->getName() + ". Must be unsigned integer type.",
ErrorCodes::ILLEGAL_TYPE_OF_COLUMN_FOR_FILTER);
if (settings.parallel_replicas_count > 1)
{
@ -376,7 +412,7 @@ BlockInputStreams MergeTreeDataSelectExecutor::read(
filter_function->children.push_back(filter_function->arguments);
}
filter_expression = ExpressionAnalyzer(filter_function, context, nullptr, data.getColumnsList()).getActions(false);
filter_expression = ExpressionAnalyzer(filter_function, context, nullptr, available_real_columns).getActions(false);
/// Добавим столбцы, нужные для sampling_expression.
std::vector<String> add_columns = filter_expression->getRequiredColumns();
@ -400,7 +436,7 @@ BlockInputStreams MergeTreeDataSelectExecutor::read(
String prewhere_column;
if (select.prewhere_expression)
{
ExpressionAnalyzer analyzer(select.prewhere_expression, context, nullptr, data.getColumnsList());
ExpressionAnalyzer analyzer(select.prewhere_expression, context, nullptr, available_real_columns);
prewhere_actions = analyzer.getActions(false);
prewhere_column = select.prewhere_expression->getColumnName();
SubqueriesForSets prewhere_subqueries = analyzer.getSubqueriesForSets();
@ -420,7 +456,7 @@ BlockInputStreams MergeTreeDataSelectExecutor::read(
size_t sum_ranges = 0;
for (auto & part : parts)
{
RangesInDataPart ranges(part, (*part_index)++);
RangesInDataPart ranges(part, (*inout_part_index)++);
if (data.mode != MergeTreeData::Unsorted)
ranges.ranges = markRangesFromPKRange(part->index, key_condition, settings);
@ -487,6 +523,12 @@ BlockInputStreams MergeTreeDataSelectExecutor::read(
for (auto & stream : res)
stream = new FilterBlockInputStream(stream, filter_expression, filter_function->getColumnName());
/// Кстати, если делается распределённый запрос или запрос к Merge-таблице, то в столбце _sample_factor могут быть разные значения.
if (sample_factor_column_queried)
for (auto & stream : res)
stream = new AddingConstColumnBlockInputStream<Float64>(
stream, new DataTypeFloat64, used_sample_factor, "_sample_factor");
return res;
}
@ -548,10 +590,11 @@ BlockInputStreams MergeTreeDataSelectExecutor::spreadMarkRangesAmongThreads(
prewhere_column, settings, virt_columns
});
if (i == 0)
{
/// Выставим приблизительное количество строк только для первого источника
static_cast<IProfilingBlockInputStream &>(*res.front()).setTotalRowsApprox(total_rows);
}
}
}
else if (sum_marks > 0)
@ -769,7 +812,8 @@ BlockInputStreams MergeTreeDataSelectExecutor::spreadMarkRangesAmongThreadsFinal
return res;
}
void MergeTreeDataSelectExecutor::createPositiveSignCondition(ExpressionActionsPtr & out_expression, String & out_column, const Context & context) const
void MergeTreeDataSelectExecutor::createPositiveSignCondition(
ExpressionActionsPtr & out_expression, String & out_column, const Context & context) const
{
ASTFunction * function = new ASTFunction;
ASTPtr function_ptr = function;

View File

@ -112,8 +112,8 @@ PKCondition::PKCondition(ASTPtr & query, const Context & context, const NamesAnd
}
/** Вычисление выражений, зависящих только от констант.
* Чтобы индекс мог использоваться, если написано, например WHERE Date = toDate(now()).
*/
* Чтобы индекс мог использоваться, если написано, например WHERE Date = toDate(now()).
*/
Block block_with_constants = getBlockWithConstants(query, context, all_columns);
/// Преобразуем секцию WHERE в обратную польскую строку.

View File

@ -55,6 +55,7 @@ namespace ErrorCodes
extern const int RESHARDING_COORDINATOR_DELETED;
extern const int RESHARDING_DISTRIBUTED_JOB_ON_HOLD;
extern const int RESHARDING_INVALID_QUERY;
extern const int RWLOCK_NO_SUCH_LOCK;
}
namespace
@ -182,8 +183,10 @@ ReshardingWorker::ReshardingWorker(const Poco::Util::AbstractConfiguration & con
zookeeper->createIfNotExists(distributed_online_path, "");
/// Notify that we are online.
(void) zookeeper->tryCreate(distributed_online_path + "/" + current_host, "",
int32_t code = zookeeper->tryCreate(distributed_online_path + "/" + current_host, "",
zkutil::CreateMode::Ephemeral);
if ((code != ZOK) && (code != ZNODEEXISTS))
throw zkutil::KeeperException(code);
distributed_lock_path = distributed_path + "/lock";
zookeeper->createIfNotExists(distributed_lock_path, "");
@ -383,6 +386,8 @@ void ReshardingWorker::perform(const Strings & job_nodes)
zookeeper->remove(child_full_path);
else if (ex.code() == ErrorCodes::RESHARDING_COORDINATOR_DELETED)
zookeeper->remove(child_full_path);
else if (ex.code() == ErrorCodes::RWLOCK_NO_SUCH_LOCK)
zookeeper->remove(child_full_path);
else
zookeeper->remove(child_full_path);
}
@ -410,6 +415,15 @@ void ReshardingWorker::perform(const std::string & job_descriptor)
current_job = ReshardingJob{job_descriptor};
zkutil::RWLock deletion_lock;
if (current_job.isCoordinated())
deletion_lock = createDeletionLock(current_job.coordinator_id);
zkutil::RWLock::Guard<zkutil::RWLock::Read, zkutil::RWLock::NonBlocking> guard{deletion_lock};
if (!deletion_lock.ownsLock())
throw Exception("Coordinator has been deleted", ErrorCodes::RESHARDING_COORDINATOR_DELETED);
StoragePtr generic_storage = context.getTable(current_job.database_name, current_job.table_name);
auto & storage = typeid_cast<StorageReplicatedMergeTree &>(*(generic_storage.get()));
current_job.storage = &storage;
@ -451,6 +465,7 @@ void ReshardingWorker::perform(const std::string & job_descriptor)
}
else if (ex.code() == ErrorCodes::RESHARDING_REMOTE_NODE_ERROR)
{
deletion_lock.release();
hardCleanup();
}
else if (ex.code() == ErrorCodes::RESHARDING_COORDINATOR_DELETED)
@ -483,6 +498,7 @@ void ReshardingWorker::perform(const std::string & job_descriptor)
auto current_host = getFQDNOrHostName();
setStatus(current_job.coordinator_id, current_host, STATUS_ERROR);
}
deletion_lock.release();
hardCleanup();
}
}
@ -514,6 +530,7 @@ void ReshardingWorker::perform(const std::string & job_descriptor)
auto current_host = getFQDNOrHostName();
setStatus(current_job.coordinator_id, current_host, STATUS_ERROR);
}
deletion_lock.release();
hardCleanup();
}
}
@ -525,6 +542,7 @@ void ReshardingWorker::perform(const std::string & job_descriptor)
throw;
}
deletion_lock.release();
hardCleanup();
LOG_DEBUG(log, "Resharding job successfully completed.");
}
@ -990,12 +1008,6 @@ UInt64 ReshardingWorker::subscribe(const std::string & coordinator_id, const std
zookeeper->set(getCoordinatorPath(coordinator_id) + "/increment", toString(block_number + 1));
}
/// We set up a timeout for this barrier because until we cross it, we don't have
/// any guarantee that all the required nodes for this distributed job are online.
/// We are inside a lightweight function, so it is not an issue.
auto timeout = context.getSettingsRef().resharding_barrier_timeout;
createSubscribeBarrier(coordinator_id).enter(timeout);
return block_number;
}
@ -1171,7 +1183,7 @@ bool ReshardingWorker::updateOfflineNodesCommon(const std::string & path, const
offline.resize(end - offline.begin());
for (const auto & node : offline)
zookeeper->set(coordinator_id + "/status/" + node,
zookeeper->set(getCoordinatorPath(coordinator_id) + "/status/" + node,
toString(static_cast<UInt64>(STATUS_ON_HOLD)));
return !offline.empty();
@ -1191,6 +1203,7 @@ ReshardingWorker::Status ReshardingWorker::getStatusCommon(const std::string & p
{
/// Note: we don't need any synchronization for the status.
/// That's why we don't acquire any read/write lock.
/// All the operations are either reads or idempotent writes.
auto zookeeper = context.getZooKeeper();
@ -1234,12 +1247,6 @@ void ReshardingWorker::attachJob()
auto zookeeper = context.getZooKeeper();
/// Check if the corresponding coordinator exists. If it doesn't, throw an exception,
/// silently ignore this job, and switch to the next job.
if (!zookeeper->exists(getCoordinatorPath(current_job.coordinator_id)))
throw Exception("Coordinator " + current_job.coordinator_id + " was deleted. Ignoring",
ErrorCodes::RESHARDING_COORDINATOR_DELETED);
auto status = getStatus();
if (status == STATUS_ERROR)
@ -1479,21 +1486,6 @@ zkutil::RWLock ReshardingWorker::createDeletionLock(const std::string & coordina
return lock;
}
zkutil::SingleBarrier ReshardingWorker::createSubscribeBarrier(const std::string & coordinator_id)
{
auto zookeeper = context.getZooKeeper();
auto node_count = zookeeper->get(getCoordinatorPath(coordinator_id) + "/node_count");
zkutil::SingleBarrier subscribe_barrier{zookeeper, getCoordinatorPath(coordinator_id) + "/subscribe_barrier",
std::stoull(node_count)};
zkutil::SingleBarrier::CancellationHook hook = std::bind(&ReshardingWorker::abortCoordinatorIfRequested, this,
coordinator_id);
subscribe_barrier.setCancellationHook(hook);
return subscribe_barrier;
}
zkutil::SingleBarrier ReshardingWorker::createCheckBarrier(const std::string & coordinator_id)
{
auto zookeeper = context.getZooKeeper();

View File

@ -125,7 +125,13 @@ BlockInputStreams StorageBuffer::read(
if (destination.get() == this)
throw Exception("Destination table is myself. Read will cause infinite loop.", ErrorCodes::INFINITE_LOOP);
streams_from_dst = destination->read(column_names, query, context, settings, processed_stage, max_block_size, threads);
/** Отключаем оптимизацию "перенос в PREWHERE",
* так как Buffer не поддерживает PREWHERE.
*/
Settings modified_settings = settings;
modified_settings.optimize_move_to_prewhere = false;
streams_from_dst = destination->read(column_names, query, context, modified_settings, processed_stage, max_block_size, threads);
}
BlockInputStreams streams_from_buffers;

View File

@ -246,6 +246,8 @@ void StorageDistributed::reshardPartitions(ASTPtr query, const String & database
std::string coordinator_id = resharding_worker.createCoordinator(cluster);
std::atomic<bool> has_notified_error{false};
try
{
/// Создать запрос ALTER TABLE ... RESHARD PARTITION ... COORDINATE WITH ...
@ -317,7 +319,26 @@ void StorageDistributed::reshardPartitions(ASTPtr query, const String & database
BlockInputStreams streams = ClusterProxy::Query(alter_query_constructor, cluster, alter_query_ptr,
context, settings, enable_shard_multiplexing).execute();
streams[0] = new UnionBlockInputStream<>(streams, nullptr, settings.max_distributed_connections);
/// This callback is called if an exception has occurred while attempting to read a block
/// from a shard. This is to avoid a potential deadlock if other shards are waiting
/// inside a barrier. Actually, even without this solution, we would avoid such a deadlock
/// because we would eventually timeout while trying to read blocks from these other shards.
/// Nevertheless this is not the ideal way of sorting out this issue.
auto exception_callback = [&resharding_worker, coordinator_id, &has_notified_error]()
{
try
{
resharding_worker.setStatus(coordinator_id, ReshardingWorker::STATUS_ERROR);
has_notified_error = true;
}
catch (...)
{
tryLogCurrentException(__PRETTY_FUNCTION__);
}
};
streams[0] = new UnionBlockInputStream<>(streams, nullptr, settings.max_distributed_connections,
exception_callback);
streams.resize(1);
auto stream_ptr = dynamic_cast<IProfilingBlockInputStream *>(&*streams[0]);
@ -330,7 +351,8 @@ void StorageDistributed::reshardPartitions(ASTPtr query, const String & database
while (!stream.isCancelled() && stream.read())
;
stream.readSuffix();
if (!stream.isCancelled())
stream.readSuffix();
}
catch (...)
{
@ -338,7 +360,8 @@ void StorageDistributed::reshardPartitions(ASTPtr query, const String & database
try
{
resharding_worker.setStatus(coordinator_id, ReshardingWorker::STATUS_ERROR);
if (!has_notified_error)
resharding_worker.setStatus(coordinator_id, ReshardingWorker::STATUS_ERROR);
resharding_worker.deleteCoordinator(coordinator_id);
}
catch (...)

View File

@ -81,6 +81,7 @@ namespace ErrorCodes
extern const int RESHARDING_NO_COORDINATOR_MEMBERSHIP;
extern const int RESHARDING_ALREADY_SUBSCRIBED;
extern const int RESHARDING_INVALID_QUERY;
extern const int RWLOCK_NO_SUCH_LOCK;
}
@ -3507,7 +3508,7 @@ void StorageReplicatedMergeTree::reshardPartitions(ASTPtr query, const String &
if (has_coordinator)
{
coordinator_id = coordinator.get<const String &>();
deletion_lock = std::move(resharding_worker.createDeletionLock(coordinator_id));
deletion_lock = resharding_worker.createDeletionLock(coordinator_id);
}
zkutil::RWLock::Guard<zkutil::RWLock::Read, zkutil::RWLock::NonBlocking> guard{deletion_lock};
@ -3613,6 +3614,7 @@ void StorageReplicatedMergeTree::reshardPartitions(ASTPtr query, const String &
{
/// Degenerate case: we are the only participating node.
/// All our jobs are uncoordinated.
deletion_lock.release();
resharding_worker.deleteCoordinator(coordinator_id);
uncoordinated_begin = partition_list.cbegin();
}
@ -3672,9 +3674,18 @@ void StorageReplicatedMergeTree::reshardPartitions(ASTPtr query, const String &
(ex.code() == ErrorCodes::RESHARDING_ALREADY_SUBSCRIBED) ||
(ex.code() == ErrorCodes::RESHARDING_INVALID_QUERY))
{
/// Theoretically any of these errors may have occurred because a user
/// has willfully attempted to botch an ongoing distributed resharding job.
/// Consequently we don't take them into account.
/// Any of these errors occurs only when a user attempts to send
/// manually a query ALTER TABLE ... RESHARD ... that specifies
/// the parameter COORDINATE WITH, in spite of the fact that no user
/// should ever use this parameter. Since taking into account such
/// errors may botch an ongoing distributed resharding job, we
/// intentionally ignore them.
}
else if ((ex.code() == ErrorCodes::RWLOCK_NO_SUCH_LOCK) ||
(ex.code() == ErrorCodes::RESHARDING_COORDINATOR_DELETED))
{
/// For any reason the coordinator has disappeared. So obviously
/// we don't have any means to notify other nodes of an error.
}
else if (ex.code() == ErrorCodes::RESHARDING_COORDINATOR_DELETED)

View File

@ -27,10 +27,11 @@ bool VirtualColumnFactory::hasColumn(const String & name)
DataTypePtr VirtualColumnFactory::tryGetType(const String & name)
{
if (name == "_table") return new DataTypeString;
if (name == "_part") return new DataTypeString;
if (name == "_part_index") return new DataTypeUInt64;
if (name == "_replicated") return new DataTypeUInt8;
if (name == "_table") return new DataTypeString;
if (name == "_part") return new DataTypeString;
if (name == "_part_index") return new DataTypeUInt64;
if (name == "_sample_factor") return new DataTypeFloat64;
if (name == "_replicated") return new DataTypeUInt8;
return nullptr;
}

View File

@ -54,7 +54,7 @@ fi
# Тесты, зависящие от данных Метрики - не публикуются наружу. Создаём симлинки на внутренний репозиторий. Весьма неудобно.
QUERIES_PRIVATE_DIR="../../../private/tests/${QUERIES_DIR}"
QUERIES_PRIVATE_DIR="../../private/tests/${QUERIES_DIR}"
if [ -d "$QUERIES_PRIVATE_DIR" ]; then
for dir in $(ls $QUERIES_PRIVATE_DIR)
do

View File

@ -0,0 +1,18 @@
DROP TABLE IF EXISTS test.sample1;
DROP TABLE IF EXISTS test.sample2;
DROP TABLE IF EXISTS test.sample_merge;
CREATE TABLE test.sample1 (x UInt64, d Date DEFAULT today()) ENGINE = MergeTree(d, intHash64(x), intHash64(x), 10);
CREATE TABLE test.sample2 (x UInt64, d Date DEFAULT today()) ENGINE = MergeTree(d, intHash64(x), intHash64(x), 10);
INSERT INTO test.sample1 (x) SELECT number AS x FROM system.numbers LIMIT 1000000;
INSERT INTO test.sample2 (x) SELECT number AS x FROM system.numbers LIMIT 2000000;
CREATE TABLE test.sample_merge AS test.sample1 ENGINE = Merge(test, '^sample\\d$');
SELECT abs(sum(_sample_factor) - 3000000) / 3000000 < 0.001 FROM test.sample_merge SAMPLE 100000;
SELECT abs(sum(_sample_factor) - 3000000) / 3000000 < 0.001 FROM merge(test, '^sample\\d$') SAMPLE 100000;
DROP TABLE test.sample1;
DROP TABLE test.sample2;
DROP TABLE test.sample_merge;

4
debian/rules vendored
View File

@ -101,7 +101,7 @@ install: build
if [ ! -e "debian/$$DAEMON_PKG-metrika-yandex.postinst" ]; then \
echo "# automatically created" > debian/$$DAEMON_PKG-metrika-yandex.postinst; \
echo "mkdir -p /etc/$$DAEMON_PKG/conf.d" >> debian/$$DAEMON_PKG-metrika-yandex.postinst; \
echo "chown metrika: /etc/$$DAEMON_PKG/conf.d" >> debian/$$DAEMON_PKG-metrika-yandex.postinst; \
echo "chown -R metrika: /etc/$$DAEMON_PKG" >> debian/$$DAEMON_PKG-metrika-yandex.postinst; \
if [ -e "debian/tmp/etc/init.d/$$DAEMON_PKG-metrika-yandex" ]; then \
if echo $$DAEMON_PKG | grep server > /dev/null; then\
echo "update-rc.d $$DAEMON_PKG-metrika-yandex defaults > /dev/null || exit \$$?" >> debian/$$DAEMON_PKG-metrika-yandex.postinst; \
@ -112,7 +112,7 @@ install: build
\
else \
echo >> debian/$$DAEMON_PKG-metrika-yandex.postinst; \
echo "mkdir -p /etc/$$DAEMON_PKG/conf.d; chown metrika: /etc/$$DAEMON_PKG/conf.d" >> debian/$$DAEMON_PKG-metrika-yandex.postinst; \
echo "mkdir -p /etc/$$DAEMON_PKG/conf.d; chown -R metrika: /etc/$$DAEMON_PKG" >> debian/$$DAEMON_PKG-metrika-yandex.postinst; \
fi; \
done

View File

@ -66,7 +66,7 @@ private:
private:
/// Указатель на реализацию для часового пояса по-умолчанию.
DateLUTImpl * default_date_lut_impl;
std::unique_ptr<DateLUTImpl> default_date_lut_impl;
/// Соответствующиая группа часовых поясов по-умолчанию.
size_t default_group_id;
///

View File

@ -0,0 +1,125 @@
#pragma once
#include <stdexcept>
#include <sstream>
/// Часть функциональности - только для сборки из репозитория Метрики.
#ifndef NO_METRIKA
#include <ssqls/IDbObject.h>
#else
struct IDbObject {};
#endif
#include <regex>
#include <DB/Common/StackTrace.h>
#include <DB/IO/WriteBufferFromString.h>
#include <Poco/ConsoleChannel.h>
#include <Poco/Logger.h>
#include <Poco/AutoPtr.h>
namespace Test
{
template <class Result, class Reference, class Enable = void>
struct Comparator
{
void check(const Result & result, const Reference & reference, const std::string & check_name = "")
{
if (result != reference)
{
std::stringstream ss;
if (check_name.size())
ss << "Check name: " << check_name << ". ";
StackTrace stacktrace;
ss << "Result ";
if (check_name.size())
ss << "for \"" << check_name << "\" ";
ss << "differs from reference" <<
". Result: " << result << ", reference: " << reference << "\n. Stacktrace: " << stacktrace.toString();
throw std::logic_error(ss.str());
}
}
};
template <class Result, class Reference, class Enable = void, class ... Args>
void compare(const Result & result, const Reference & reference, Args && ... args)
{
Comparator<Result, Reference> comp;
comp.check(result, reference, std::forward<Args>(args)...);
}
template <class ContainerA, class ContainerB>
void compareContainers(const ContainerA & result, const ContainerB & reference)
{
if (result != reference)
{
std::stringstream ss;
ss << "Result differs from reference. Result: {";
for (auto a : result)
ss << " " << a;
ss << " }, reference {";
for (auto b : reference)
ss << " " << b;
ss << " }";
StackTrace stacktrace;
ss << "\n. Stacktrace: " << stacktrace.toString();
throw std::logic_error(ss.str());
}
}
template <class Result, class Reference>
struct Comparator<Result, Reference, typename std::enable_if<
std::is_base_of<IDbObject, Result>::value &&
std::is_base_of<IDbObject, Reference>::value>::type>
{
void check(const Result & result, const Reference & reference, const std::string field_name_regexp_str = ".*")
{
std::string res_s;
{
DB::WriteBufferFromString res_b(res_s);
result.writeTSKV(res_b, "", "", "");
}
std::string ref_s;
{
DB::WriteBufferFromString ref_b(ref_s);
reference.writeTSKV(ref_b, "", "", "");
}
size_t ref_pos = 0;
size_t res_pos = 0;
while (ref_pos != std::string::npos && res_pos != std::string::npos)
{
size_t new_ref_pos = ref_s.find('\t', ref_pos);
size_t new_res_pos = res_s.find('\t', res_pos);
auto ref_field = ref_s.substr(ref_pos,
new_ref_pos != std::string::npos ? new_ref_pos - ref_pos : new_ref_pos);
if (std::regex_match(ref_field, std::regex(field_name_regexp_str + "=.*")))
{
auto res_field = res_s.substr(res_pos, new_res_pos != std::string::npos ? new_res_pos - res_pos : new_res_pos);
compare(res_field, ref_field);
}
res_pos = new_res_pos != res_s.size() && new_res_pos != std::string::npos ?
new_res_pos + 1 : std::string::npos;
ref_pos = new_ref_pos != ref_s.size() && new_ref_pos != std::string::npos ?
new_ref_pos + 1 : std::string::npos;
}
}
};
inline void initLogger()
{
Poco::AutoPtr<Poco::ConsoleChannel> channel = new Poco::ConsoleChannel(std::cerr);
Poco::Logger::root().setChannel(channel);
Poco::Logger::root().setLevel("trace");
}
};

View File

@ -84,9 +84,9 @@ DateLUT::DateLUT()
throw Poco::Exception("Failed to get default time zone information.");
default_group_id = it->second;
default_date_lut_impl = new DateLUTImpl(default_time_zone);
default_date_lut_impl.reset(new DateLUTImpl(default_time_zone));
auto & wrapper = (*date_lut_impl_list)[default_group_id];
wrapper.store(default_date_lut_impl, std::memory_order_seq_cst);
wrapper.store(default_date_lut_impl.get(), std::memory_order_seq_cst);
}
const DateLUTImpl & DateLUT::getImplementation(const std::string & time_zone, size_t group_id) const

View File

@ -12,6 +12,6 @@ target_link_libraries (date_lut2 common libicui18n.a libicuuc.a libicudata.a dl)
target_link_libraries (date_lut3 common libicui18n.a libicuuc.a libicudata.a dl)
target_link_libraries (date_lut4 common libicui18n.a libicuuc.a libicudata.a dl)
target_link_libraries (multi_version common libboost_thread.a libboost_system.a rt)
target_link_libraries (json_test statdaemons common libboost_thread.a libboost_system.a rt)
target_link_libraries (json_test dbms libboost_thread.a libboost_system.a rt)
add_check (json_test)

View File

@ -1,4 +1,5 @@
#include <statdaemons/Test.h>
#define NO_METRIKA
#include <common/Test.h>
#include <vector>
#include <string>
#include <common/JSON.h>

View File

@ -165,8 +165,8 @@ protected:
Poco::Event wakeup_event;
/// Поток, в котором принимается сигнал HUP/USR1 для закрытия логов.
Poco::Thread close_logs_thread;
std::unique_ptr<Poco::Runnable> close_logs_listener;
Poco::Thread signal_listener_thread;
std::unique_ptr<Poco::Runnable> signal_listener;
/// Файлы с логами.
Poco::AutoPtr<Poco::FileChannel> log_file;

View File

@ -86,14 +86,31 @@ struct Pipe
Pipe()
{
read_fd = -1;
write_fd = -1;
if (0 != pipe(fds))
DB::throwFromErrno("Cannot create pipe");
}
void close()
{
if (-1 != read_fd)
{
::close(read_fd);
read_fd = -1;
}
if (-1 != write_fd)
{
::close(write_fd);
write_fd = -1;
}
}
~Pipe()
{
close(fds[0]);
close(fds[1]);
close();
}
};
@ -486,7 +503,13 @@ private:
/// Для создания и уничтожения unique_ptr, который в заголовочном файле объявлен от incomplete type.
BaseDaemon::BaseDaemon() = default;
BaseDaemon::~BaseDaemon() = default;
BaseDaemon::~BaseDaemon()
{
signal_pipe.close();
signal_listener_thread.join();
}
void BaseDaemon::terminate()
@ -760,8 +783,8 @@ void BaseDaemon::initialize(Application& self)
/// Выведем ревизию демона
logRevision();
close_logs_listener.reset(new SignalListener);
close_logs_thread.start(*close_logs_listener);
signal_listener.reset(new SignalListener);
signal_listener_thread.start(*signal_listener);
graphite_writer.reset(new GraphiteWriter("graphite"));
}

View File

@ -10,6 +10,7 @@ namespace ErrorCodes
extern const int LOGICAL_ERROR;
extern const int RWLOCK_ALREADY_HELD;
extern const int RWLOCK_NO_SUCH_LOCK;
extern const int ABORTED;
}
@ -57,7 +58,17 @@ RWLock::RWLock(ZooKeeperPtr & zookeeper_, const std::string & path_)
{
int32_t code = zookeeper->tryCreate(path, "", CreateMode::Persistent);
if ((code != ZOK) && (code != ZNODEEXISTS))
throw KeeperException(code);
{
if (code == ZNONODE)
throw DB::Exception("No such lock", DB::ErrorCodes::RWLOCK_NO_SUCH_LOCK);
else
throw KeeperException(code);
}
}
RWLock::operator bool() const
{
return zookeeper && !path.empty();
}
RWLock::operator bool() const
@ -117,13 +128,23 @@ void RWLock::acquireImpl(Mode mode)
try
{
/// Enqueue a new request for a lock.
key = zookeeper->create(path + "/" + Prefix<lock_type>::name,
"", CreateMode::EphemeralSequential);
int32_t code = zookeeper->tryCreate(path + "/" + Prefix<lock_type>::name,
"", CreateMode::EphemeralSequential, key);
if (code == ZNONODE)
throw DB::Exception("No such lock", DB::ErrorCodes::RWLOCK_NO_SUCH_LOCK);
else if (code != ZOK)
throw KeeperException(code);
key = key.substr(path.length() + 1);
while (true)
{
auto children = zookeeper->getChildren(path);
std::vector<std::string> children;
int32_t code = zookeeper->tryGetChildren(path, children);
if (code == ZNONODE)
throw DB::Exception("No such lock", DB::ErrorCodes::RWLOCK_NO_SUCH_LOCK);
else if (code != ZOK)
throw KeeperException(code);
std::sort(children.begin(), children.end(), nodeQueueCmp);
auto it = std::lower_bound(children.cbegin(), children.cend(), key, nodeQueueCmp);
@ -168,7 +189,12 @@ void RWLock::acquireImpl(Mode mode)
if (mode == NonBlocking)
{
zookeeper->remove(path + "/" + key);
int32_t code = zookeeper->tryRemove(path + "/" + key);
if (code == ZNONODE)
throw DB::Exception("No such lock", DB::ErrorCodes::RWLOCK_NO_SUCH_LOCK);
else if (code != ZOK)
throw KeeperException(code);
key.clear();
break;
}

@ -1 +0,0 @@
Subproject commit f0dd35eadabb7e7c9aa2ab203d54abed693e9b80

View File

@ -6,6 +6,9 @@ CONTROL=debian/control
CHLOG=debian/changelog
CHDATE=$(LC_ALL=C date -R | sed -e 's/,/\\,/g') # Заменим запятую на '\,'
# Собирать пакет с конфигурационными файлами для Яндекс.Метрики.
BUILD_PACKAGE_FOR_METRIKA=$([ -f 'private/Server/metrika/config.xml' ] && echo 'yes')
# Список демонов для сборки может быть указан в аргументах командной строки.
if [ $# -gt 0 ]
then