This commit is contained in:
Roman Peshkurov 2015-09-08 19:39:39 +03:00
commit a16069546f
53 changed files with 1358 additions and 202 deletions

View File

@ -346,17 +346,18 @@ private:
}
protected:
template <typename T1, typename T2>
bool match(T1 & events_it, const T2 events_end) const
template <typename T>
bool match(T & events_it, const T events_end) const
{
const auto action_begin = std::begin(actions);
const auto action_end = std::end(actions);
auto action_it = action_begin;
const auto events_begin = events_it;
auto base_it = events_it;
/// an iterator to action plus an iterator to row in events list plus timestamp at the start of sequence
using backtrack_info = std::tuple<decltype(action_it), decltype(events_it), decltype(base_it)>;
using backtrack_info = std::tuple<decltype(action_it), T, T>;
std::stack<backtrack_info> back_stack;
/// backtrack if possible
@ -473,6 +474,9 @@ protected:
++action_it;
}
if (events_it == events_begin)
++events_it;
return action_it == action_end;
}

View File

@ -443,16 +443,18 @@ public:
bool next()
{
if (!is_initialized)
{
Cell::State::read(in);
DB::readVarUInt(size, in);
is_initialized = true;
}
if (read_count == size)
{
is_eof = true;
return false;
}
else if (read_count == 0)
{
Cell::State::read(in);
DB::readVarUInt(size, in);
}
cell.read(in);
++read_count;
@ -462,18 +464,19 @@ public:
inline const value_type & get() const
{
if ((read_count == 0) || is_eof)
if (!is_initialized || is_eof)
throw DB::Exception("No available data", DB::ErrorCodes::NO_AVAILABLE_DATA);
return cell.getValue();
}
private:
DB::ReadBuffer in;
DB::ReadBuffer & in;
Cell cell;
size_t read_count = 0;
size_t size;
bool is_eof = false;
bool is_initialized = false;
};
class iterator

View File

@ -80,18 +80,21 @@ public:
bool next()
{
if (read_count == size)
{
is_eof = true;
return false;
}
else if (read_count == 0)
if (!is_initialized)
{
Cell::State::read(in);
DB::readVarUInt(size, in);
if (size > capacity)
throw DB::Exception("Illegal size");
is_initialized = true;
}
if (read_count == size)
{
is_eof = true;
return false;
}
cell.read(in);
@ -102,18 +105,19 @@ public:
inline const value_type & get() const
{
if ((read_count == 0) || is_eof)
if (!is_initialized || is_eof)
throw DB::Exception("No available data", DB::ErrorCodes::NO_AVAILABLE_DATA);
return cell.getValue();
}
private:
DB::ReadBuffer in;
DB::ReadBuffer & in;
Cell cell;
size_t read_count = 0;
size_t size;
bool is_eof = false;
bool is_initialized = false;
};
class iterator

View File

@ -53,7 +53,7 @@ public:
size_t hash(const Key & x) const { return Hash::operator()(x); }
/// NOTE Плохо для хэш-таблиц больше чем на 2^32 ячеек.
size_t getBucketFromHash(size_t hash_value) const { return (hash_value >> (32 - BITS_FOR_BUCKET)) & MAX_BUCKET; }
static size_t getBucketFromHash(size_t hash_value) { return (hash_value >> (32 - BITS_FOR_BUCKET)) & MAX_BUCKET; }
protected:
typename Impl::iterator beginOfNextNonEmptyBucket(size_t & bucket)

View File

@ -45,7 +45,7 @@
*/
#define DEFAULT_MERGE_BLOCK_SIZE 8192
#define DEFAULT_MAX_QUERY_SIZE 65536
#define DEFAULT_MAX_QUERY_SIZE 262144
#define SHOW_CHARS_ON_SYNTAX_ERROR 160L
#define DEFAULT_MAX_DISTRIBUTED_CONNECTIONS 1024
#define DEFAULT_INTERACTIVE_DELAY 100000

View File

@ -286,6 +286,7 @@ namespace ErrorCodes
NO_AVAILABLE_DATA = 280,
DICTIONARY_IS_EMPTY = 281,
INCORRECT_INDEX = 282,
UNKNOWN_GLOBAL_SUBQUERIES_METHOD = 284,
KEEPER_EXCEPTION = 999,
POCO_EXCEPTION = 1000,

View File

@ -16,7 +16,12 @@ using Poco::SharedPtr;
class LimitBlockInputStream : public IProfilingBlockInputStream
{
public:
LimitBlockInputStream(BlockInputStreamPtr input_, size_t limit_, size_t offset_ = 0);
/** Если always_read_till_end = false (по-умолчанию), то после чтения достаточного количества данных,
* возвращает пустой блок, и это приводит к отмене выполнения запроса.
* Если always_read_till_end = true - читает все данные до конца, но игнорирует их. Это нужно в редких случаях:
* когда иначе, из-за отмены запроса, мы бы не получили данные для GROUP BY WITH TOTALS с удалённого сервера.
*/
LimitBlockInputStream(BlockInputStreamPtr input_, size_t limit_, size_t offset_, bool always_read_till_end_ = false);
String getName() const override { return "Limit"; }
@ -33,7 +38,8 @@ protected:
private:
size_t limit;
size_t offset;
size_t pos;
size_t pos = 0;
bool always_read_till_end;
};
}

View File

@ -12,6 +12,19 @@ namespace DB
* Это экономит оперативку в случае использования двухуровневой агрегации, где в каждом потоке будет до 256 блоков с частями результата.
*
* Агрегатные функции в блоках не должны быть финализированы, чтобы их состояния можно было объединить.
*
* Замечания:
*
* На хорошей сети (10Gbit) может работать заметно медленнее, так как чтения блоков с разных
* удалённых серверов делаются последовательно, при этом, чтение упирается в CPU.
* Это несложно исправить.
*
* Также, чтения и вычисления (слияние состояний) делаются по очереди.
* Есть возможность делать чтения асинхронно - при этом будет расходоваться в два раза больше памяти, но всё-равно немного.
* Это можно сделать с помощью UnionBlockInputStream.
*
* Можно держать в памяти не по одному блоку из каждого источника, а по несколько, и распараллелить мердж.
* При этом будет расходоваться кратно больше оперативки.
*/
class MergingAggregatedMemoryEfficientBlockInputStream : public IProfilingBlockInputStream
{
@ -19,20 +32,18 @@ public:
MergingAggregatedMemoryEfficientBlockInputStream(BlockInputStreams inputs_, const Names & keys_names_,
const AggregateDescriptions & aggregates_, bool overflow_row_, bool final_)
: aggregator(keys_names_, aggregates_, overflow_row_, 0, OverflowMode::THROW, nullptr, 0, 0),
final(final_)
final(final_),
inputs(inputs_.begin(), inputs_.end())
{
children = inputs_;
current_blocks.resize(children.size());
overflow_blocks.resize(children.size());
is_exhausted.resize(children.size());
}
String getName() const override { return "MergingAggregatedMemorySavvy"; }
String getName() const override { return "MergingAggregatedMemoryEfficient"; }
String getID() const override
{
std::stringstream res;
res << "MergingAggregatedMemorySavvy(" << aggregator.getID();
res << "MergingAggregatedMemoryEfficient(" << aggregator.getID();
for (size_t i = 0, size = children.size(); i < size; ++i)
res << ", " << children.back()->getID();
res << ")";
@ -43,68 +54,222 @@ protected:
Block readImpl() override
{
/// Если child - RemoteBlockInputStream, то отправляет запрос на все удалённые серверы, инициируя вычисления.
if (current_bucket_num == -1)
if (!started)
{
started = true;
for (auto & child : children)
child->readPrefix();
}
/// Всё прочитали.
if (current_bucket_num > 255)
return {};
/** Имеем несколько источников.
* Из каждого из них могут приходить следующие данные:
*
* 1. Блок, с указанным bucket_num.
* Это значит, что на удалённом сервере, данные были разрезаны по корзинам.
* И данные для одного bucket_num с разных серверов можно независимо объединять.
* При этом, даннные для разных bucket_num будут идти по возрастанию.
*
* 2. Блок без указания bucket_num.
* Это значит, что на удалённом сервере, данные не были разрезаны по корзинам.
* В случае, когда со всех серверов прийдут такие данные, их можно всех объединить.
* А если с другой части серверов прийдут данные, разрезанные по корзинам,
* то данные, не разрезанные по корзинам, нужно сначала разрезать, а потом объединять.
*
* 3. Блоки с указанием is_overflows.
* Это дополнительные данные для строк, не прошедших через max_rows_to_group_by.
* Они должны объединяться друг с другом отдельно.
*/
/// Читаем следующие блоки для current_bucket_num
for (size_t i = 0, size = children.size(); i < size; ++i)
constexpr size_t NUM_BUCKETS = 256;
++current_bucket_num;
for (auto & input : inputs)
{
while (!is_exhausted[i] && (!current_blocks[i] || current_blocks[i].info.bucket_num < current_bucket_num))
{
current_blocks[i] = children[i]->read();
if (input.is_exhausted)
continue;
if (!current_blocks[i])
if (input.block.info.bucket_num >= current_bucket_num)
continue;
/// Если придёт блок не с основными данными, а с overflows, то запомним его и повторим чтение.
while (true)
{
// std::cerr << "reading block\n";
Block block = input.stream->read();
if (!block)
{
is_exhausted[i] = true;
// std::cerr << "input is exhausted\n";
input.is_exhausted = true;
break;
}
else if (current_blocks[i].info.is_overflows)
if (block.info.bucket_num != -1)
{
overflow_blocks[i].swap(current_blocks[i]);
/// Один из разрезанных блоков для двухуровневых данных.
// std::cerr << "block for bucket " << block.info.bucket_num << "\n";
has_two_level = true;
input.block = block;
}
else if (block.info.is_overflows)
{
// std::cerr << "block for overflows\n";
has_overflows = true;
input.overflow_block = block;
continue;
}
else
{
/// Блок для неразрезанных (одноуровневых) данных.
// std::cerr << "block without bucket\n";
input.block = block;
}
break;
}
}
/// Может быть, нет блоков для current_bucket_num, а все блоки имеют больший bucket_num.
Int32 min_bucket_num = 256;
for (size_t i = 0, size = children.size(); i < size; ++i)
if (!is_exhausted[i] && current_blocks[i].info.bucket_num < min_bucket_num)
min_bucket_num = current_blocks[i].info.bucket_num;
while (true)
{
if (current_bucket_num == NUM_BUCKETS)
{
/// Обработали все основные данные. Остались, возможно, только overflows-блоки.
// std::cerr << "at end\n";
current_bucket_num = min_bucket_num;
if (has_overflows)
{
// std::cerr << "merging overflows\n";
/// Все потоки исчерпаны.
if (current_bucket_num > 255)
return {}; /// TODO overflow_blocks.
has_overflows = false;
BlocksList blocks_to_merge;
/// TODO Если есть single_level и two_level блоки.
for (auto & input : inputs)
if (input.overflow_block)
blocks_to_merge.emplace_back(std::move(input.overflow_block));
/// Объединяем все блоки с current_bucket_num.
return aggregator.mergeBlocks(blocks_to_merge, final);
}
else
return {};
}
else if (has_two_level)
{
/** Есть двухуровневые данные.
* Будем обрабатывать номера корзин по возрастанию.
* Найдём минимальный номер корзины, для которой есть данные,
* затем померджим эти данные.
*/
// std::cerr << "has two level\n";
BlocksList blocks_to_merge;
for (size_t i = 0, size = children.size(); i < size; ++i)
if (current_blocks[i].info.bucket_num == current_bucket_num)
blocks_to_merge.emplace_back(std::move(current_blocks[i]));
int min_bucket_num = NUM_BUCKETS;
Block res = aggregator.mergeBlocks(blocks_to_merge, final);
for (auto & input : inputs)
{
/// Изначально разрезанные (двухуровневые) блоки.
if (input.block.info.bucket_num != -1 && input.block.info.bucket_num < min_bucket_num)
min_bucket_num = input.block.info.bucket_num;
++current_bucket_num;
return res;
/// Ещё не разрезанный по корзинам блок. Разрезаем его и кладём результат в splitted_blocks.
if (input.block.info.bucket_num == -1 && input.block && input.splitted_blocks.empty())
{
LOG_TRACE(&Logger::get("MergingAggregatedMemoryEfficient"), "Having block without bucket: will split.");
input.splitted_blocks = aggregator.convertBlockToTwoLevel(input.block);
/** Нельзя уничтожать исходный блок.
* Потому что он владеет Arena с состояниями агрегатных функций,
* а splitted_blocks ей не владеют, но ссылаются на эти состояния.
*/
}
/// Блоки, которые мы получили разрезанием одноуровневых блоков.
if (!input.splitted_blocks.empty())
{
for (const auto & block : input.splitted_blocks)
{
if (block && block.info.bucket_num < min_bucket_num)
{
min_bucket_num = block.info.bucket_num;
break;
}
}
}
}
current_bucket_num = min_bucket_num;
// std::cerr << "current_bucket_num = " << current_bucket_num << "\n";
/// Блоков с основными данными больше нет.
if (current_bucket_num == NUM_BUCKETS)
continue;
/// Теперь собираем блоки для current_bucket_num, чтобы их померджить.
BlocksList blocks_to_merge;
for (auto & input : inputs)
{
if (input.block.info.bucket_num == current_bucket_num)
{
// std::cerr << "having block for current_bucket_num\n";
blocks_to_merge.emplace_back(std::move(input.block));
input.block = Block();
}
else if (!input.splitted_blocks.empty() && input.splitted_blocks[min_bucket_num])
{
// std::cerr << "having splitted data for bucket\n";
blocks_to_merge.emplace_back(std::move(input.splitted_blocks[min_bucket_num]));
input.splitted_blocks[min_bucket_num] = Block();
}
}
return aggregator.mergeBlocks(blocks_to_merge, final);
}
else
{
/// Есть только одноуровневые данные. Просто мерджим их.
// std::cerr << "don't have two level\n";
BlocksList blocks_to_merge;
for (auto & input : inputs)
if (input.block)
blocks_to_merge.emplace_back(std::move(input.block));
current_bucket_num = NUM_BUCKETS;
return aggregator.mergeBlocks(blocks_to_merge, final);
}
}
}
private:
Aggregator aggregator;
bool final;
Int32 current_bucket_num = -1;
std::vector<Block> current_blocks;
std::vector<UInt8> is_exhausted;
bool started = false;
bool has_two_level = false;
bool has_overflows = false;
int current_bucket_num = -1;
std::vector<Block> overflow_blocks;
struct Input
{
BlockInputStreamPtr stream;
Block block;
Block overflow_block;
std::vector<Block> splitted_blocks;
bool is_exhausted = false;
Input(BlockInputStreamPtr & stream_) : stream(stream_) {}
};
std::vector<Input> inputs;
};
}

View File

@ -120,7 +120,7 @@ public:
}
protected:
/// Отправить на удаленные реплики все временные таблицы
/// Отправить на удаленные серверы все временные таблицы.
void sendExternalTables()
{
size_t count = parallel_replicas->size();
@ -151,6 +151,7 @@ protected:
parallel_replicas->sendExternalTablesData(external_tables_data);
}
Block readImpl() override
{
if (!sent_query)

View File

@ -7,6 +7,8 @@
#include <DB/Common/isLocalAddress.h>
#include <statdaemons/ext/range.hpp>
#include <Poco/Util/AbstractConfiguration.h>
#include "writeParenthesisedString.h"
namespace DB
{
@ -93,14 +95,35 @@ private:
WriteBufferFromString out{query};
writeString("SELECT ", out);
writeProbablyBackQuotedString(dict_struct.id_name, out);
if (!dict_struct.id.expression.empty())
{
writeParenthesisedString(dict_struct.id.expression, out);
writeString(" AS ", out);
}
if (!dict_struct.range_min.empty() && !dict_struct.range_max.empty())
writeProbablyBackQuotedString(dict_struct.id.name, out);
if (dict_struct.range_min && dict_struct.range_max)
{
writeString(", ", out);
writeProbablyBackQuotedString(dict_struct.range_min, out);
if (!dict_struct.range_min->expression.empty())
{
writeParenthesisedString(dict_struct.range_min->expression, out);
writeString(" AS ", out);
}
writeProbablyBackQuotedString(dict_struct.range_min->name, out);
writeString(", ", out);
writeProbablyBackQuotedString(dict_struct.range_max, out);
if (!dict_struct.range_max->expression.empty())
{
writeParenthesisedString(dict_struct.range_max->expression, out);
writeString(" AS ", out);
}
writeProbablyBackQuotedString(dict_struct.range_max->name, out);
}
for (const auto & attr : dict_struct.attributes)
@ -109,7 +132,7 @@ private:
if (!attr.expression.empty())
{
writeString(attr.expression, out);
writeParenthesisedString(attr.expression, out);
writeString(" AS ", out);
}
@ -144,7 +167,13 @@ private:
WriteBufferFromString out{query};
writeString("SELECT ", out);
writeProbablyBackQuotedString(dict_struct.id_name, out);
if (!dict_struct.id.expression.empty())
{
writeParenthesisedString(dict_struct.id.expression, out);
writeString(" AS ", out);
}
writeProbablyBackQuotedString(dict_struct.id.name, out);
for (const auto & attr : dict_struct.attributes)
{
@ -152,7 +181,7 @@ private:
if (!attr.expression.empty())
{
writeString(attr.expression, out);
writeParenthesisedString(attr.expression, out);
writeString(" AS ", out);
}
@ -175,7 +204,7 @@ private:
writeString(" AND ", out);
}
writeProbablyBackQuotedString(dict_struct.id_name, out);
writeProbablyBackQuotedString(dict_struct.id.name, out);
writeString(" IN (", out);
auto first = true;

View File

@ -21,10 +21,18 @@ Block createSampleBlock(const DictionaryStructure & dict_struct)
ColumnWithTypeAndName{
new ColumnUInt64,
new DataTypeUInt64,
dict_struct.id_name
dict_struct.id.name
}
};
if (dict_struct.range_min)
for (const auto & attribute : { dict_struct.range_min, dict_struct.range_max })
block.insert(ColumnWithTypeAndName{
new ColumnUInt16,
new DataTypeDate,
attribute->name
});
for (const auto & attribute : dict_struct.attributes)
block.insert(ColumnWithTypeAndName{
attribute.type->createColumn(), attribute.type, attribute.name
@ -57,6 +65,12 @@ public:
if ("file" == source_type)
{
if (dict_struct.has_expressions)
throw Exception{
"Dictionary source of type `file` does not support attribute expressions",
ErrorCodes::LOGICAL_ERROR
};
const auto filename = config.getString(config_prefix + ".file.path");
const auto format = config.getString(config_prefix + ".file.format");
return std::make_unique<FileDictionarySource>(filename, format, sample_block, context);

View File

@ -7,6 +7,8 @@
#include <vector>
#include <string>
#include <map>
#include <experimental/optional>
namespace DB
{
@ -112,25 +114,51 @@ struct DictionaryAttribute final
const bool injective;
};
struct DictionarySpecialAttribute final
{
const std::string name;
const std::string expression;
DictionarySpecialAttribute(const Poco::Util::AbstractConfiguration & config, const std::string & config_prefix)
: name{config.getString(config_prefix + ".name", "")},
expression{config.getString(config_prefix + ".expression", "")}
{
if (name.empty() && !expression.empty())
throw Exception{
"Element " + config_prefix + ".name is empty",
ErrorCodes::BAD_ARGUMENTS
};
}
};
/// Name of identifier plus list of attributes
struct DictionaryStructure final
{
std::string id_name;
DictionarySpecialAttribute id;
std::vector<DictionaryAttribute> attributes;
std::string range_min;
std::string range_max;
std::experimental::optional<DictionarySpecialAttribute> range_min;
std::experimental::optional<DictionarySpecialAttribute> range_max;
bool has_expressions = false;
DictionaryStructure(const Poco::Util::AbstractConfiguration & config, const std::string & config_prefix)
: id_name{config.getString(config_prefix + ".id.name")},
range_min{config.getString(config_prefix + ".range_min.name", "")},
range_max{config.getString(config_prefix + ".range_max.name", "")}
: id{config, config_prefix + ".id"}
{
if (id_name.empty())
if (id.name.empty())
throw Exception{
"No 'id' specified for dictionary",
ErrorCodes::BAD_ARGUMENTS
};
if (config.has(config_prefix + ".range_min"))
range_min.emplace(config, config_prefix + ".range_min");
if (config.has(config_prefix + ".range_max"))
range_max.emplace(config, config_prefix + ".range_max");
if (!id.expression.empty() ||
(range_min && !range_min->expression.empty()) || (range_max && !range_max->expression.empty()))
has_expressions = true;
Poco::Util::AbstractConfiguration::Keys keys;
config.keys(config_prefix, keys);
auto has_hierarchy = false;
@ -148,6 +176,8 @@ struct DictionaryStructure final
const auto underlying_type = getAttributeUnderlyingType(type_string);
const auto expression = config.getString(prefix + "expression", "");
if (!expression.empty())
has_expressions = true;
const auto null_value_string = config.getString(prefix + "null_value");
Field null_value;

View File

@ -6,6 +6,8 @@
#include <mysqlxx/Pool.h>
#include <Poco/Util/AbstractConfiguration.h>
#include <strconvert/escape.h>
#include "writeParenthesisedString.h"
namespace DB
{
@ -134,14 +136,35 @@ private:
WriteBufferFromString out{query};
writeString("SELECT ", out);
writeProbablyBackQuotedString(dict_struct.id_name, out);
if (!dict_struct.id.expression.empty())
{
writeParenthesisedString(dict_struct.id.expression, out);
writeString(" AS ", out);
}
if (!dict_struct.range_min.empty() && !dict_struct.range_max.empty())
writeProbablyBackQuotedString(dict_struct.id.name, out);
if (dict_struct.range_min && dict_struct.range_max)
{
writeString(", ", out);
writeProbablyBackQuotedString(dict_struct.range_min, out);
if (!dict_struct.range_min->expression.empty())
{
writeParenthesisedString(dict_struct.range_min->expression, out);
writeString(" AS ", out);
}
writeProbablyBackQuotedString(dict_struct.range_min->name, out);
writeString(", ", out);
writeProbablyBackQuotedString(dict_struct.range_max, out);
if (!dict_struct.range_max->expression.empty())
{
writeParenthesisedString(dict_struct.range_max->expression, out);
writeString(" AS ", out);
}
writeProbablyBackQuotedString(dict_struct.range_max->name, out);
}
for (const auto & attr : dict_struct.attributes)
@ -150,7 +173,7 @@ private:
if (!attr.expression.empty())
{
writeString(attr.expression, out);
writeParenthesisedString(attr.expression, out);
writeString(" AS ", out);
}
@ -185,7 +208,13 @@ private:
WriteBufferFromString out{query};
writeString("SELECT ", out);
writeProbablyBackQuotedString(dict_struct.id_name, out);
if (!dict_struct.id.expression.empty())
{
writeParenthesisedString(dict_struct.id.expression, out);
writeString(" AS ", out);
}
writeProbablyBackQuotedString(dict_struct.id.name, out);
for (const auto & attr : dict_struct.attributes)
{
@ -193,7 +222,7 @@ private:
if (!attr.expression.empty())
{
writeString(attr.expression, out);
writeParenthesisedString(attr.expression, out);
writeString(" AS ", out);
}
@ -216,7 +245,7 @@ private:
writeString(" AND ", out);
}
writeProbablyBackQuotedString(dict_struct.id_name, out);
writeProbablyBackQuotedString(dict_struct.id.name, out);
writeString(" IN (", out);
auto first = true;

View File

@ -0,0 +1,18 @@
#pragma once
#include <DB/IO/WriteHelpers.h>
namespace DB
{
void writeParenthesisedString(const String & s, WriteBuffer & buf)
{
writeChar('(', buf);
writeString(s, buf);
writeChar(')', buf);
}
}

View File

@ -147,6 +147,16 @@ private:
return type_res;
}
static const DataTypePtr & getScalarType(const DataTypePtr & type)
{
const auto array = typeid_cast<const DataTypeArray *>(type.get());
if (!array)
return type;
return getScalarType(array->getNestedType());
}
public:
/// Получить тип результата по типам аргументов. Если функция неприменима для данных аргументов - кинуть исключение.
DataTypePtr getReturnType(const DataTypes & arguments) const
@ -176,12 +186,16 @@ public:
/// Выполнить функцию над блоком.
void execute(Block & block, const ColumnNumbers & arguments, size_t result)
{
/// Все аргументы должны быть константами.
for (size_t i = 0, size = arguments.size(); i < size; ++i)
if (!block.getByPosition(arguments[i]).column->isConst())
throw Exception("Arguments for function array must be constant.", ErrorCodes::ILLEGAL_COLUMN);
const auto is_const = [&] {
for (const auto arg_num : arguments)
if (!block.getByPosition(arg_num).column->isConst())
return false;
DataTypePtr result_type = block.getByPosition(arguments[0]).type;
return true;
}();
const auto first_arg = block.getByPosition(arguments[0]);
DataTypePtr result_type = first_arg.type;
if (result_type->behavesAsNumber())
{
/// Если тип числовой, вычисляем наименьший общий тип
@ -189,16 +203,43 @@ public:
result_type = getLeastCommonType(result_type, block.getByPosition(arguments[i]).type);
}
Array arr;
for (size_t i = 0, size = arguments.size(); i < size; ++i)
if (block.getByPosition(arguments[i]).type->getName() == result_type->getName())
/// Если элемент такого же типа как результат, просто добавляем его в ответ
arr.push_back((*block.getByPosition(arguments[i]).column)[0]);
else
/// Иначе необходимо привести его к типу результата
addField(result_type, (*block.getByPosition(arguments[i]).column)[0], arr);
if (is_const)
{
Array arr;
for (const auto arg_num : arguments)
if (block.getByPosition(arg_num).type->getName() == result_type->getName())
/// Если элемент такого же типа как результат, просто добавляем его в ответ
arr.push_back((*block.getByPosition(arg_num).column)[0]);
else
/// Иначе необходимо привести его к типу результата
addField(result_type, (*block.getByPosition(arg_num).column)[0], arr);
block.getByPosition(result).column = new ColumnConstArray(block.getByPosition(arguments[0]).column->size(), arr, new DataTypeArray(result_type));
block.getByPosition(result).column = new ColumnConstArray{
first_arg.column->size(), arr, new DataTypeArray{result_type}
};
}
else
{
auto out = new ColumnArray{result_type->createColumn()};
ColumnPtr out_ptr{out};
for (const auto row_num : ext::range(0, first_arg.column->size()))
{
Array arr;
for (const auto arg_num : arguments)
if (block.getByPosition(arg_num).type->getName() == result_type->getName())
/// Если элемент такого же типа как результат, просто добавляем его в ответ
arr.push_back((*block.getByPosition(arg_num).column)[row_num]);
else
/// Иначе необходимо привести его к типу результата
addField(result_type, (*block.getByPosition(arg_num).column)[row_num], arr);
out->insert(arr);
}
block.getByPosition(result).column = out_ptr;
}
}
};

View File

@ -24,6 +24,8 @@ namespace DB
* rand - linear congruental generator 0 .. 2^32 - 1.
* rand64 - комбинирует несколько значений rand, чтобы получить значения из диапазона 0 .. 2^64 - 1.
*
* randConstant - служебная функция, выдаёт константный столбец со случайным значением.
*
* В качестве затравки используют время.
* Замечание: переинициализируется на каждый блок.
* Это значит, что таймер должен быть достаточного разрешения, чтобы выдавать разные значения на каждый блок.
@ -182,11 +184,60 @@ public:
};
struct NameRand { static constexpr auto name = "rand"; };
struct NameRand64 { static constexpr auto name = "rand64"; };
template <typename Impl, typename Name>
class FunctionRandomConstant : public IFunction
{
private:
typedef typename Impl::ReturnType ToType;
/// Значение одно для разных блоков.
bool is_initialized = false;
ToType value;
public:
static constexpr auto name = Name::name;
static IFunction * create(const Context & context) { return new FunctionRandomConstant; }
/// Получить имя функции.
String getName() const
{
return name;
}
/// Получить тип результата по типам аргументов. Если функция неприменима для данных аргументов - кинуть исключение.
DataTypePtr getReturnType(const DataTypes & arguments) const
{
if (arguments.size() > 1)
throw Exception("Number of arguments for function " + getName() + " doesn't match: passed "
+ toString(arguments.size()) + ", should be 0 or 1.",
ErrorCodes::NUMBER_OF_ARGUMENTS_DOESNT_MATCH);
return new typename DataTypeFromFieldType<typename Impl::ReturnType>::Type;
}
/// Выполнить функцию над блоком.
void execute(Block & block, const ColumnNumbers & arguments, size_t result)
{
if (!is_initialized)
{
is_initialized = true;
typename ColumnVector<ToType>::Container_t vec_to(1);
Impl::execute(vec_to);
value = vec_to[0];
}
block.getByPosition(result).column = new ColumnConst<ToType>(block.rowsInFirstColumn(), value);
}
};
struct NameRand { static constexpr auto name = "rand"; };
struct NameRand64 { static constexpr auto name = "rand64"; };
struct NameRandConstant { static constexpr auto name = "randConstant"; };
typedef FunctionRandom<RandImpl, NameRand> FunctionRand;
typedef FunctionRandom<Rand64Impl, NameRand64> FunctionRand64;
typedef FunctionRandomConstant<RandImpl, NameRandConstant> FunctionRandConstant;
}

View File

@ -180,7 +180,7 @@ inline String likePatternToRegexp(const String & pattern)
break;
case '%':
if (pos + 1 != end)
res += "(?:.|\n)*";
res += ".*";
else
return res;
break;
@ -323,8 +323,13 @@ namespace Regexps
if (known_regexps.end() == it)
it = known_regexps.emplace(pattern, std::make_unique<Holder>()).first;
return it->second->get([&pattern] {
return new Regexp{createRegexp<like>(pattern, no_capture ? OptimizedRegularExpression::RE_NO_CAPTURE : 0)};
return it->second->get([&pattern]
{
int flags = OptimizedRegularExpression::RE_DOT_NL;
if (no_capture)
flags |= OptimizedRegularExpression::RE_NO_CAPTURE;
return new Regexp{createRegexp<like>(pattern, flags)};
});
}
}

View File

@ -731,6 +731,11 @@ public:
*/
Block mergeBlocks(BlocksList & blocks, bool final);
/** Преобразовать (разрезать) блок частично-агрегированных данных на много блоков, как если бы использовался двухуровневый метод агрегации.
* Это нужно, чтобы потом было проще объединить результат с другими результатами, уже являющимися двухуровневыми.
*/
std::vector<Block> convertBlockToTwoLevel(const Block & block);
using CancellationHook = std::function<bool()>;
/** Установить функцию, которая проверяет, можно ли прервать текущую задачу.
@ -806,7 +811,7 @@ protected:
/** Если заданы только имена столбцов (key_names, а также aggregates[i].column_name), то вычислить номера столбцов.
* Сформировать блок - пример результата.
*/
void initialize(Block & block);
void initialize(const Block & block);
/** Выбрать способ агрегации на основе количества и типов ключей. */
AggregatedDataVariants::Type chooseAggregationMethod(const ConstColumnPlainPtrs & key_columns, Sizes & key_sizes);
@ -961,6 +966,16 @@ protected:
Block & block,
AggregatedDataVariants & result) const;
template <typename Method>
void convertBlockToTwoLevelImpl(
Method & method,
Arena * pool,
ConstColumnPlainPtrs & key_columns,
const Sizes & key_sizes,
StringRefs & keys,
const Block & source,
std::vector<Block> & destinations) const;
template <typename Method>
void destroyImpl(
Method & method) const;

View File

@ -44,12 +44,6 @@ class QueryLog;
struct MergeTreeSettings;
/// имя таблицы -> таблица
typedef std::map<String, StoragePtr> Tables;
/// имя БД -> таблицы
typedef std::map<String, Tables> Databases;
/// (имя базы данных, имя таблицы)
typedef std::pair<String, String> DatabaseAndTableName;
@ -272,6 +266,8 @@ public:
private:
const Dictionaries & getDictionariesImpl(bool throw_on_error) const;
const ExternalDictionaries & getExternalDictionariesImpl(bool throw_on_error) const;
StoragePtr getTableImpl(const String & database_name, const String & table_name, Exception * exception) const;
};

View File

@ -163,8 +163,8 @@ private:
* - в "левой" таблице, он будет доступен по имени expr(x), так как ещё не было выполнено действие Project.
* Надо запомнить оба этих варианта.
*/
NameSet join_key_names_left_set;
NameSet join_key_names_right_set;
Names join_key_names_left;
Names join_key_names_right;
NamesAndTypesList columns_added_by_join;

View File

@ -14,6 +14,7 @@
#include <DB/Common/MemoryTracker.h>
#include <DB/IO/WriteHelpers.h>
#include <DB/Interpreters/QueryPriorities.h>
#include <DB/Storages/IStorage.h>
namespace DB
@ -41,6 +42,9 @@ struct ProcessListElement
bool is_cancelled = false;
/// Здесь могут быть зарегистрированы временные таблицы. Изменять под mutex-ом.
Tables temporary_tables;
ProcessListElement(const String & query_, const String & user_,
const String & query_id_, const Poco::Net::IPAddress & ip_address_,
@ -144,6 +148,12 @@ public:
Poco::ScopedLock<Poco::FastMutex> lock(mutex);
max_size = max_size_;
}
/// Зарегистрировать временную таблицу. Потом её можно будет получить по query_id и по названию.
void addTemporaryTable(ProcessListElement & elem, const String & table_name, StoragePtr storage);
/// Найти временную таблицу по query_id и по названию. Замечание: плохо работает, если есть разные запросы с одним query_id.
StoragePtr tryGetTemporaryTable(const String & query_id, const String & table_name) const;
};
}

View File

@ -151,6 +151,9 @@ struct Settings
\
/** Логгировать запросы и писать лог в системную таблицу. */ \
M(SettingBool, log_queries, 0) \
\
/** Схема выполнения GLOBAL-подзапросов. */ \
M(SettingGlobalSubqueriesMethod, global_subqueries_method, GlobalSubqueriesMethod::PUSH) \
/// Всевозможные ограничения на выполнение запроса.
Limits limits;

View File

@ -555,7 +555,7 @@ struct SettingCompressionMethod
if (s == "zstd")
return CompressionMethod::ZSTD;
throw Exception("Unknown compression method: '" + s + "', must be one of 'quicklz', 'lz4', 'lz4hc', 'zstd' ", ErrorCodes::UNKNOWN_COMPRESSION_METHOD);
throw Exception("Unknown compression method: '" + s + "', must be one of 'quicklz', 'lz4', 'lz4hc', 'zstd'", ErrorCodes::UNKNOWN_COMPRESSION_METHOD);
}
String toString() const
@ -596,4 +596,72 @@ struct SettingCompressionMethod
writeBinary(toString(), buf);
}
};
/// Способ выполнения глобальных распределённых подзапросов.
enum class GlobalSubqueriesMethod
{
PUSH = 0, /// Отправлять данные подзапроса на все удалённые серверы.
PULL = 1, /// Удалённые серверы будут скачивать данные подзапроса с сервера-инициатора.
};
struct SettingGlobalSubqueriesMethod
{
GlobalSubqueriesMethod value;
bool changed = false;
SettingGlobalSubqueriesMethod(GlobalSubqueriesMethod x = GlobalSubqueriesMethod::PUSH) : value(x) {}
operator GlobalSubqueriesMethod() const { return value; }
SettingGlobalSubqueriesMethod & operator= (GlobalSubqueriesMethod x) { set(x); return *this; }
static GlobalSubqueriesMethod getGlobalSubqueriesMethod(const String & s)
{
if (s == "push")
return GlobalSubqueriesMethod::PUSH;
if (s == "pull")
return GlobalSubqueriesMethod::PULL;
throw Exception("Unknown global subqueries execution method: '" + s + "', must be one of 'push', 'pull'",
ErrorCodes::UNKNOWN_GLOBAL_SUBQUERIES_METHOD);
}
String toString() const
{
const char * strings[] = { "push", "pull" };
if (value < GlobalSubqueriesMethod::PUSH || value > GlobalSubqueriesMethod::PULL)
throw Exception("Unknown global subqueries execution method", ErrorCodes::UNKNOWN_GLOBAL_SUBQUERIES_METHOD);
return strings[static_cast<size_t>(value)];
}
void set(GlobalSubqueriesMethod x)
{
value = x;
changed = true;
}
void set(const Field & x)
{
set(safeGet<const String &>(x));
}
void set(const String & x)
{
set(getGlobalSubqueriesMethod(x));
}
void set(ReadBuffer & buf)
{
String x;
readBinary(x, buf);
set(x);
}
void write(WriteBuffer & buf) const
{
writeBinary(toString(), buf);
}
};
}

View File

@ -308,7 +308,14 @@ private:
mutable Poco::RWLock structure_lock;
};
typedef std::vector<StoragePtr> StorageVector;
typedef IStorage::TableStructureReadLocks TableLocks;
using StorageVector = std::vector<StoragePtr>;
using TableLocks = IStorage::TableStructureReadLocks;
/// имя таблицы -> таблица
using Tables = std::map<String, StoragePtr>;
/// имя БД -> таблицы
using Databases = std::map<String, Tables>;
}

View File

@ -129,7 +129,7 @@ private:
NamesAndTypesListPtr chooseColumns(Cluster & cluster, const String & database, const String & table, const Context & context) const
{
/// Запрос на описание таблицы
String query = "DESC TABLE " + database + "." + table;
String query = "DESC TABLE " + backQuoteIfNeed(database) + "." + backQuoteIfNeed(table);
Settings settings = context.getSettings();
NamesAndTypesList res;

View File

@ -8,8 +8,8 @@ namespace DB
using Poco::SharedPtr;
LimitBlockInputStream::LimitBlockInputStream(BlockInputStreamPtr input_, size_t limit_, size_t offset_)
: limit(limit_), offset(offset_), pos(0)
LimitBlockInputStream::LimitBlockInputStream(BlockInputStreamPtr input_, size_t limit_, size_t offset_, bool always_read_till_end_)
: limit(limit_), offset(offset_), always_read_till_end(always_read_till_end_)
{
children.push_back(input_);
}
@ -23,7 +23,16 @@ Block LimitBlockInputStream::readImpl()
/// pos - сколько строк было прочитано, включая последний прочитанный блок
if (pos >= offset + limit)
return res;
{
if (!always_read_till_end)
return res;
else
{
while (children.back()->read())
;
return res;
}
}
do
{

View File

@ -132,7 +132,7 @@ int main(int argc, char ** argv)
Poco::SharedPtr<IBlockInputStream> in = table->read(column_names, 0, context, Settings(), stage)[0];
in = new ExpressionBlockInputStream(in, expression);
in = new FilterBlockInputStream(in, 4);
//in = new LimitBlockInputStream(in, 10);
//in = new LimitBlockInputStream(in, 10, 0);
WriteBufferFromOStream ob(std::cout);
RowOutputStreamPtr out_ = new TabSeparatedRowOutputStream(ob, expression->getSampleBlock());

View File

@ -152,7 +152,7 @@ int main(int argc, char ** argv)
Poco::SharedPtr<IBlockInputStream> in = table->read(column_names, 0, Context{}, Settings(), stage, argc == 2 ? atoi(argv[1]) : 1048576)[0];
in = new PartialSortingBlockInputStream(in, sort_columns);
in = new MergeSortingBlockInputStream(in, sort_columns, DEFAULT_BLOCK_SIZE, 0, 0, "");
//in = new LimitBlockInputStream(in, 10);
//in = new LimitBlockInputStream(in, 10, 0);
WriteBufferFromOStream ob(std::cout);
RowOutputStreamPtr out_ = new TabSeparatedRowOutputStream(ob, sample);

View File

@ -45,7 +45,7 @@ int main(int argc, char ** argv)
streams[i] = new DB::AsynchronousBlockInputStream(streams[i]);
DB::BlockInputStreamPtr stream = new DB::UnionBlockInputStream(streams, nullptr, settings.max_threads);
stream = new DB::LimitBlockInputStream(stream, 10);
stream = new DB::LimitBlockInputStream(stream, 10, 0);
DB::FormatFactory format_factory;
DB::WriteBufferFromFileDescriptor wb(STDERR_FILENO);

View File

@ -8,6 +8,7 @@ void registerFunctionsRandom(FunctionFactory & factory)
{
factory.registerFunction<FunctionRand>();
factory.registerFunction<FunctionRand64>();
factory.registerFunction<FunctionRandConstant>();
}
}

View File

@ -58,7 +58,7 @@ void AggregatedDataVariants::convertToTwoLevel()
}
void Aggregator::initialize(Block & block)
void Aggregator::initialize(const Block & block)
{
if (isCancelled())
return;
@ -1735,17 +1735,162 @@ Block Aggregator::mergeBlocks(BlocksList & blocks, bool final)
throw Exception("Unknown aggregated data variant.", ErrorCodes::UNKNOWN_AGGREGATED_DATA_VARIANT);
}
BlocksList merged_block = convertToBlocks(result, final, 1);
BlocksList merged_blocks = convertToBlocks(result, final, 1);
if (merged_block.size() > 1) /// TODO overflows
throw Exception("Logical error: temporary result is not single-level", ErrorCodes::LOGICAL_ERROR);
if (merged_blocks.size() > 1)
{
/** Может быть два блока. Один с is_overflows, другой - нет.
* Если есть непустой блок не is_overflows, то удаляем блок с is_overflows.
* Если есть пустой блок не is_overflows и блок с is_overflows, то удаляем пустой блок.
*
* Это делаем, потому что исходим из допущения, что в функцию передаются
* либо все блоки не is_overflows, либо все блоки is_overflows.
*/
bool has_nonempty_nonoverflows = false;
bool has_overflows = false;
for (const auto & block : merged_blocks)
{
if (block && !block.info.is_overflows)
has_nonempty_nonoverflows = true;
else if (block.info.is_overflows)
has_overflows = true;
}
if (has_nonempty_nonoverflows)
{
for (auto it = merged_blocks.begin(); it != merged_blocks.end(); ++it)
{
if (it->info.is_overflows)
{
merged_blocks.erase(it);
break;
}
}
}
else if (has_overflows)
{
for (auto it = merged_blocks.begin(); it != merged_blocks.end(); ++it)
{
if (!*it)
{
merged_blocks.erase(it);
break;
}
}
}
if (merged_blocks.size() > 1)
throw Exception("Logical error: temporary result is not single-level", ErrorCodes::LOGICAL_ERROR);
}
LOG_TRACE(log, "Merged partially aggregated blocks.");
if (merged_block.empty())
if (merged_blocks.empty())
return {};
return merged_block.front();
return merged_blocks.front();
}
template <typename Method>
void NO_INLINE Aggregator::convertBlockToTwoLevelImpl(
Method & method,
Arena * pool,
ConstColumnPlainPtrs & key_columns,
const Sizes & key_sizes,
StringRefs & keys,
const Block & source,
std::vector<Block> & destinations) const
{
typename Method::State state;
state.init(key_columns);
size_t rows = source.rowsInFirstColumn();
size_t columns = source.columns();
/// Для всех строчек.
for (size_t i = 0; i < rows; ++i)
{
/// Получаем ключ. Вычисляем на его основе номер корзины.
typename Method::Key key = state.getKey(key_columns, keys_size, i, key_sizes, keys, *pool);
auto hash = method.data.hash(key);
auto bucket = method.data.getBucketFromHash(hash);
/// Этот ключ нам больше не нужен.
method.onExistingKey(key, keys, *pool);
Block & dst = destinations[bucket];
if (unlikely(!dst))
{
dst = source.cloneEmpty();
dst.info.bucket_num = bucket;
}
for (size_t j = 0; j < columns; ++j)
dst.unsafeGetByPosition(j).column.get()->insertFrom(*source.unsafeGetByPosition(j).column.get(), i);
}
}
std::vector<Block> Aggregator::convertBlockToTwoLevel(const Block & block)
{
if (!block)
return {};
initialize(block);
AggregatedDataVariants data;
StringRefs key(keys_size);
ConstColumnPlainPtrs key_columns(keys_size);
Sizes key_sizes;
/// Запоминаем столбцы, с которыми будем работать
for (size_t i = 0; i < keys_size; ++i)
key_columns[i] = block.getByPosition(i).column;
AggregatedDataVariants::Type type = chooseAggregationMethod(key_columns, key_sizes);
#define M(NAME) \
else if (type == AggregatedDataVariants::Type::NAME) \
type = AggregatedDataVariants::Type::NAME ## _two_level;
if (false) {}
APPLY_FOR_VARIANTS_CONVERTIBLE_TO_TWO_LEVEL(M)
#undef M
else
throw Exception("Unknown aggregated data variant.", ErrorCodes::UNKNOWN_AGGREGATED_DATA_VARIANT);
data.init(type);
size_t num_buckets = 0;
#define M(NAME) \
else if (data.type == AggregatedDataVariants::Type::NAME) \
num_buckets = data.NAME->data.NUM_BUCKETS;
if (false) {}
APPLY_FOR_VARIANTS_TWO_LEVEL(M)
#undef M
else
throw Exception("Unknown aggregated data variant.", ErrorCodes::UNKNOWN_AGGREGATED_DATA_VARIANT);
std::vector<Block> splitted_blocks(num_buckets);
#define M(NAME) \
else if (data.type == AggregatedDataVariants::Type::NAME) \
convertBlockToTwoLevelImpl(*data.NAME, data.aggregates_pool, \
key_columns, data.key_sizes, key, block, splitted_blocks);
if (false) {}
APPLY_FOR_VARIANTS_TWO_LEVEL(M)
#undef M
else
throw Exception("Unknown aggregated data variant.", ErrorCodes::UNKNOWN_AGGREGATED_DATA_VARIANT);
return splitted_blocks;
}

View File

@ -366,55 +366,68 @@ StoragePtr Context::tryGetExternalTable(const String & table_name) const
StoragePtr Context::getTable(const String & database_name, const String & table_name) const
{
Poco::ScopedLock<Poco::Mutex> lock(shared->mutex);
if (database_name.empty())
{
StoragePtr res;
if ((res = tryGetExternalTable(table_name)))
return res;
if (session_context && (res = session_context->tryGetExternalTable(table_name)))
return res;
if (global_context && (res = global_context->tryGetExternalTable(table_name)))
return res;
}
String db = database_name.empty() ? current_database : database_name;
Databases::const_iterator it = shared->databases.find(db);
if (shared->databases.end() == it)
throw Exception("Database " + db + " doesn't exist", ErrorCodes::UNKNOWN_DATABASE);
Tables::const_iterator jt = it->second.find(table_name);
if (it->second.end() == jt)
throw Exception("Table " + db + "." + table_name + " doesn't exist.", ErrorCodes::UNKNOWN_TABLE);
return jt->second;
Exception exc;
auto res = getTableImpl(database_name, table_name, &exc);
if (!res)
throw exc;
return res;
}
StoragePtr Context::tryGetTable(const String & database_name, const String & table_name) const
{
return getTableImpl(database_name, table_name, nullptr);
}
StoragePtr Context::getTableImpl(const String & database_name, const String & table_name, Exception * exception) const
{
Poco::ScopedLock<Poco::Mutex> lock(shared->mutex);
/** Возможность обратиться к временным таблицам другого запроса в виде _query_QUERY_ID.table
* NOTE В дальнейшем может потребоваться подумать об изоляции.
*/
if (database_name.size() > strlen("_query_")
&& database_name.compare(0, strlen("_query_"), "_query_") == 0)
{
String requested_query_id = database_name.substr(strlen("_query_"));
auto res = shared->process_list.tryGetTemporaryTable(requested_query_id, table_name);
if (!res && exception)
*exception = Exception(
"Cannot find temporary table with name " + table_name + " for query with id " + requested_query_id, ErrorCodes::UNKNOWN_TABLE);
return res;
}
if (database_name.empty())
{
StoragePtr res;
if ((res = tryGetExternalTable(table_name)))
return res;
if (session_context && (res = session_context->tryGetExternalTable(table_name)))
return res;
if (global_context && (res = global_context->tryGetExternalTable(table_name)))
StoragePtr res = tryGetExternalTable(table_name);
if (res)
return res;
}
String db = database_name.empty() ? current_database : database_name;
Databases::const_iterator it = shared->databases.find(db);
if (shared->databases.end() == it)
return StoragePtr();
{
if (exception)
*exception = Exception("Database " + db + " doesn't exist", ErrorCodes::UNKNOWN_DATABASE);
return {};
}
Tables::const_iterator jt = it->second.find(table_name);
if (it->second.end() == jt)
return StoragePtr();
{
if (exception)
*exception = Exception("Table " + db + "." + table_name + " doesn't exist.", ErrorCodes::UNKNOWN_TABLE);
return {};
}
if (!jt->second)
throw Exception("Logical error: entry for table " + db + "." + table_name + " exists in Context but it is nullptr.", ErrorCodes::LOGICAL_ERROR);
return jt->second;
}
@ -424,7 +437,14 @@ void Context::addExternalTable(const String & table_name, StoragePtr storage)
{
if (external_tables.end() != external_tables.find(table_name))
throw Exception("Temporary table " + table_name + " already exists.", ErrorCodes::TABLE_ALREADY_EXISTS);
external_tables[table_name] = storage;
if (process_list_elem)
{
Poco::ScopedLock<Poco::Mutex> lock(shared->mutex);
shared->process_list.addTemporaryTable(*process_list_elem, table_name, storage);
}
}
@ -590,6 +610,9 @@ void Context::setCurrentDatabase(const String & name)
void Context::setCurrentQueryId(const String & query_id)
{
if (!current_query_id.empty())
throw Exception("Logical error: attempt to set query_id twice", ErrorCodes::LOGICAL_ERROR);
String query_id_to_set = query_id;
if (query_id_to_set.empty()) /// Если пользователь не передал свой query_id, то генерируем его самостоятельно.
query_id_to_set = shared->uuid_generator.createRandom().toString();

View File

@ -37,7 +37,7 @@ DictionaryPtr DictionaryFactory::create(const std::string & name, Poco::Util::Ab
if ("range_hashed" == layout_type)
{
if (dict_struct.range_min.empty() || dict_struct.range_min.empty())
if (!dict_struct.range_min || !dict_struct.range_min)
throw Exception{
name + ": dictionary of layout 'range_hashed' requires .structure.range_min and .structure.range_max",
ErrorCodes::BAD_ARGUMENTS
@ -47,7 +47,7 @@ DictionaryPtr DictionaryFactory::create(const std::string & name, Poco::Util::Ab
}
else
{
if (!dict_struct.range_min.empty() || !dict_struct.range_min.empty())
if (dict_struct.range_min || dict_struct.range_min)
throw Exception{
name + ": elements .structure.range_min and .structure.range_max should be defined only "
"for a dictionary of layout 'range_hashed'",

View File

@ -1,3 +1,5 @@
#include <Poco/Util/Application.h>
#include <DB/DataTypes/FieldToDataType.h>
#include <DB/Parsers/ASTFunction.h>
@ -934,6 +936,10 @@ static SharedPtr<InterpreterSelectQuery> interpretSubquery(
void ExpressionAnalyzer::addExternalStorage(ASTPtr & subquery_or_table_name)
{
/// При нераспределённых запросах, создание временных таблиц не имеет смысла.
if (!(storage && storage->isRemote()))
return;
if (const ASTIdentifier * table = typeid_cast<const ASTIdentifier *>(&*subquery_or_table_name))
{
/// Если это уже внешняя таблица, ничего заполять не нужно. Просто запоминаем ее наличие.
@ -958,15 +964,73 @@ void ExpressionAnalyzer::addExternalStorage(ASTPtr & subquery_or_table_name)
Block sample = interpreter->getSampleBlock();
NamesAndTypesListPtr columns = new NamesAndTypesList(sample.getColumnsList());
/** Заменяем подзапрос на имя временной таблицы.
* Именно в таком виде, запрос отправится на удалённый сервер.
* На удалённый сервер отправится эта временная таблица, и на его стороне,
* вместо выполнения подзапроса, надо будет просто из неё прочитать.
*/
subquery_or_table_name = new ASTIdentifier(StringRange(), external_table_name, ASTIdentifier::Table);
StoragePtr external_storage = StorageMemory::create(external_table_name, columns);
/** Есть два способа выполнения распределённых GLOBAL-подзапросов.
*
* Способ push:
* Данные подзапроса отправляются на все удалённые серверы, где они затем используются.
* Для этого способа, данные отправляются в виде "внешних таблиц" и будут доступны на каждом удалённом сервере по имени типа _data1.
* Заменяем в запросе подзапрос на это имя.
*
* Способ pull:
* Удалённые серверы скачивают данные подзапроса с сервера-инициатора запроса.
* Для этого способа, заменяем подзапрос на другой подзапрос вида (SELECT * FROM remote('host:port', _query_QUERY_ID, _data1))
* Этот подзапрос, по факту, говорит - "надо скачать данные оттуда".
*
* Способ pull имеет преимущество, потому что в нём удалённый сервер может решить, что ему не нужны данные и не скачивать их в таких случаях.
*/
if (settings.global_subqueries_method == GlobalSubqueriesMethod::PUSH)
{
/** Заменяем подзапрос на имя временной таблицы.
* Именно в таком виде, запрос отправится на удалённый сервер.
* На удалённый сервер отправится эта временная таблица, и на его стороне,
* вместо выполнения подзапроса, надо будет просто из неё прочитать.
*/
subquery_or_table_name = new ASTIdentifier(StringRange(), external_table_name, ASTIdentifier::Table);
}
else if (settings.global_subqueries_method == GlobalSubqueriesMethod::PULL)
{
String host_port = getFQDNOrHostName() + ":" + Poco::Util::Application::instance().config().getString("tcp_port");
String database = "_query_" + context.getCurrentQueryId();
auto subquery = new ASTSubquery;
subquery_or_table_name = subquery;
auto select = new ASTSelectQuery;
subquery->children.push_back(select);
auto exp_list = new ASTExpressionList;
select->select_expression_list = exp_list;
select->children.push_back(select->select_expression_list);
Names column_names = external_storage->getColumnNamesList();
for (const auto & name : column_names)
exp_list->children.push_back(new ASTIdentifier({}, name));
auto table_func = new ASTFunction;
select->table = table_func;
select->children.push_back(select->table);
table_func->name = "remote";
auto args = new ASTExpressionList;
table_func->arguments = args;
table_func->children.push_back(table_func->arguments);
auto address_lit = new ASTLiteral({}, host_port);
args->children.push_back(address_lit);
auto database_lit = new ASTLiteral({}, database);
args->children.push_back(database_lit);
auto table_lit = new ASTLiteral({}, external_table_name);
args->children.push_back(table_lit);
}
else
throw Exception("Unknown global subqueries execution method", ErrorCodes::UNKNOWN_GLOBAL_SUBQUERIES_METHOD);
external_tables[external_table_name] = external_storage;
subqueries_for_sets[external_table_name].source = interpreter->execute().in;
subqueries_for_sets[external_table_name].source_sample = interpreter->getSampleBlock();
@ -1780,15 +1844,13 @@ bool ExpressionAnalyzer::appendJoin(ExpressionActionsChain & chain, bool only_ty
if (!subquery_for_set.join)
{
Names join_key_names_left(join_key_names_left_set.begin(), join_key_names_left_set.end());
Names join_key_names_right(join_key_names_right_set.begin(), join_key_names_right_set.end());
JoinPtr join = new Join(join_key_names_left, join_key_names_right, settings.limits, ast_join.kind, ast_join.strictness);
Names required_joined_columns(join_key_names_right.begin(), join_key_names_right.end());
for (const auto & name_type : columns_added_by_join)
required_joined_columns.push_back(name_type.name);
/** Для GLOBAL JOIN-ов происходит следующее:
/** Для GLOBAL JOIN-ов (в случае, например, push-метода выполнения GLOBAL подзапросов) происходит следующее:
* - в функции addExternalStorage подзапрос JOIN (SELECT ...) заменяется на JOIN _data1,
* в объекте subquery_for_set выставляется этот подзапрос в качестве source и временная таблица _data1 в качестве table.
* - в этой функции видно выражение JOIN _data1.
@ -2174,27 +2236,31 @@ void ExpressionAnalyzer::collectJoinedColumns(NameSet & joined_columns, NamesAnd
auto & keys = typeid_cast<ASTExpressionList &>(*node.using_expr_list);
for (const auto & key : keys.children)
{
if (!join_key_names_left_set.insert(key->getColumnName()).second)
throw Exception("Duplicate column in USING list", ErrorCodes::DUPLICATE_COLUMN);
if (join_key_names_left.end() == std::find(join_key_names_left.begin(), join_key_names_left.end(), key->getColumnName()))
join_key_names_left.push_back(key->getColumnName());
else
throw Exception("Duplicate column " + key->getColumnName() + " in USING list", ErrorCodes::DUPLICATE_COLUMN);
if (!join_key_names_right_set.insert(key->getAliasOrColumnName()).second)
throw Exception("Duplicate column in USING list", ErrorCodes::DUPLICATE_COLUMN);
if (join_key_names_right.end() == std::find(join_key_names_right.begin(), join_key_names_right.end(), key->getAliasOrColumnName()))
join_key_names_right.push_back(key->getAliasOrColumnName());
else
throw Exception("Duplicate column " + key->getAliasOrColumnName() + " in USING list", ErrorCodes::DUPLICATE_COLUMN);
}
}
for (const auto i : ext::range(0, nested_result_sample.columns()))
{
const auto & col = nested_result_sample.getByPosition(i);
if (!join_key_names_right_set.count(col.name))
if (join_key_names_right.end() == std::find(join_key_names_right.begin(), join_key_names_right.end(), col.name))
{
joined_columns.insert(col.name);
joined_columns_name_type.emplace_back(col.name, col.type);
}
}
/* for (const auto & name : join_key_names_left_set)
/* for (const auto & name : join_key_names_left)
std::cerr << "JOIN key (left): " << name << std::endl;
for (const auto & name : join_key_names_right_set)
for (const auto & name : join_key_names_right)
std::cerr << "JOIN key (right): " << name << std::endl;
std::cerr << std::endl;
for (const auto & name : joined_columns)

View File

@ -98,7 +98,7 @@ void InterpreterSelectQuery::init(BlockInputStreamPtr input, const Names & requi
void InterpreterSelectQuery::basicInit(BlockInputStreamPtr input_)
{
if (query.table && typeid_cast<ASTSelectQuery *>(&*query.table))
if (query.table && typeid_cast<ASTSelectQuery *>(query.table.get()))
{
if (table_column_names.empty())
{
@ -107,10 +107,10 @@ void InterpreterSelectQuery::basicInit(BlockInputStreamPtr input_)
}
else
{
if (query.table && typeid_cast<const ASTFunction *>(&*query.table))
if (query.table && typeid_cast<const ASTFunction *>(query.table.get()))
{
/// Получить табличную функцию
TableFunctionPtr table_function_ptr = context.getTableFunctionFactory().get(typeid_cast<const ASTFunction *>(&*query.table)->name, context);
TableFunctionPtr table_function_ptr = context.getTableFunctionFactory().get(typeid_cast<const ASTFunction *>(query.table.get())->name, context);
/// Выполнить ее и запомнить результат
storage = table_function_ptr->execute(query.table, context);
}
@ -329,7 +329,7 @@ BlockIO InterpreterSelectQuery::execute()
executeUnion();
/// Ограничения на результат, квота на результат, а также колбек для прогресса.
if (IProfilingBlockInputStream * stream = dynamic_cast<IProfilingBlockInputStream *>(&*streams[0]))
if (IProfilingBlockInputStream * stream = dynamic_cast<IProfilingBlockInputStream *>(streams[0].get()))
{
/// Ограничения действуют только на конечный результат.
if (to_stage == QueryProcessingStage::Complete)
@ -590,7 +590,7 @@ void InterpreterSelectQuery::executeSingleQuery()
{
transformStreams([&](auto & stream)
{
if (IProfilingBlockInputStream * p_stream = dynamic_cast<IProfilingBlockInputStream *>(&*stream))
if (IProfilingBlockInputStream * p_stream = dynamic_cast<IProfilingBlockInputStream *>(stream.get()))
p_stream->enableExtremes();
});
}
@ -651,7 +651,7 @@ QueryProcessingStage::Enum InterpreterSelectQuery::executeFetchColumns()
/// Список столбцов, которых нужно прочитать, чтобы выполнить запрос.
Names required_columns = query_analyzer->getRequiredColumns();
if (query.table && typeid_cast<ASTSelectQuery *>(&*query.table))
if (query.table && typeid_cast<ASTSelectQuery *>(query.table.get()))
{
/** Для подзапроса не действуют ограничения на максимальный размер результата.
* Так как результат поздапроса - ещё не результат всего запроса.
@ -792,7 +792,7 @@ QueryProcessingStage::Enum InterpreterSelectQuery::executeFetchColumns()
transformStreams([&](auto & stream)
{
if (IProfilingBlockInputStream * p_stream = dynamic_cast<IProfilingBlockInputStream *>(&*stream))
if (IProfilingBlockInputStream * p_stream = dynamic_cast<IProfilingBlockInputStream *>(stream.get()))
{
p_stream->setLimits(limits);
p_stream->setQuota(quota);
@ -1091,9 +1091,38 @@ void InterpreterSelectQuery::executeLimit()
/// Если есть LIMIT
if (query.limit_length)
{
/** Редкий случай:
* если нет WITH TOTALS и есть подзапрос в FROM, и там на одном из уровней есть WITH TOTALS,
* то при использовании LIMIT-а следует читать данные до конца, а не отменять выполнение запроса раньше,
* потому что при отмене выполнения запроса, мы не получим данные для totals с удалённого сервера.
*/
bool always_read_till_end = false;
if (!query.group_by_with_totals && query.table && typeid_cast<const ASTSelectQuery *>(query.table.get()))
{
const ASTSelectQuery * subquery = static_cast<const ASTSelectQuery *>(query.table.get());
while (subquery->table)
{
if (subquery->group_by_with_totals)
{
/** NOTE Можно ещё проверять, что таблица в подзапросе - распределённая, и что она смотрит только на один шард.
* В остальных случаях totals будет вычислен на сервере-инициаторе запроса, и читать данные до конца не обязательно.
*/
always_read_till_end = true;
break;
}
if (typeid_cast<const ASTSelectQuery *>(subquery->table.get()))
subquery = static_cast<const ASTSelectQuery *>(subquery->table.get());
else
break;
}
}
transformStreams([&](auto & stream)
{
stream = new LimitBlockInputStream(stream, limit_length, limit_offset);
stream = new LimitBlockInputStream(stream, limit_length, limit_offset, always_read_till_end);
});
}
}

View File

@ -376,12 +376,18 @@ void Join::setSampleBlock(const Block & block)
sample_block_with_columns_to_add = block;
/// Удаляем из sample_block_with_columns_to_add ключевые столбцы.
for (const auto & name : key_names_right)
/// Переносим из sample_block_with_columns_to_add ключевые столбцы в sample_block_with_keys, сохраняя порядок.
size_t pos = 0;
while (pos < sample_block_with_columns_to_add.columns())
{
size_t pos = sample_block_with_columns_to_add.getPositionByName(name);
sample_block_with_keys.insert(sample_block_with_columns_to_add.unsafeGetByPosition(pos));
sample_block_with_columns_to_add.erase(pos);
const auto & name = sample_block_with_columns_to_add.unsafeGetByPosition(pos).name;
if (key_names_right.end() != std::find(key_names_right.begin(), key_names_right.end(), name))
{
sample_block_with_keys.insert(sample_block_with_columns_to_add.unsafeGetByPosition(pos));
sample_block_with_columns_to_add.erase(pos);
}
else
++pos;
}
for (size_t i = 0, size = sample_block_with_columns_to_add.columns(); i < size; ++i)
@ -426,7 +432,9 @@ bool Join::insertFromBlock(const Block & block)
if (getFullness(kind))
{
/// Переносим ключевые столбцы в начало блока.
/** Переносим ключевые столбцы в начало блока.
* Именно там их будет ожидать NonJoinedBlockInputStream.
*/
size_t key_num = 0;
for (const auto & name : key_names_right)
{
@ -810,6 +818,8 @@ void Join::checkTypesOfKeys(const Block & block_left, const Block & block_right)
void Join::joinBlock(Block & block) const
{
// std::cerr << "joinBlock: " << block.dumpStructure() << "\n";
Poco::ScopedReadRWLock lock(rwlock);
checkTypesOfKeys(block, sample_block_with_keys);
@ -917,6 +927,8 @@ public:
result_sample_block = left_sample_block;
// std::cerr << result_sample_block.dumpStructure() << "\n";
/// Добавляем в блок новые столбцы.
for (size_t i = 0; i < num_columns_right; ++i)
{
@ -932,10 +944,11 @@ public:
{
const String & name = left_sample_block.getByPosition(i).name;
if (parent.key_names_left.end() == std::find(parent.key_names_left.begin(), parent.key_names_left.end(), name))
auto found_key_column = std::find(parent.key_names_left.begin(), parent.key_names_left.end(), name);
if (parent.key_names_left.end() == found_key_column)
column_numbers_left.push_back(i);
else
column_numbers_keys_and_right.push_back(i);
column_numbers_keys_and_right.push_back(found_key_column - parent.key_names_left.begin());
}
for (size_t i = 0; i < num_columns_right; ++i)
@ -1046,8 +1059,6 @@ private:
for (; it != end; ++it)
{
// std::cerr << it->second.getUsed() << "\n";
if (it->second.getUsed())
continue;

View File

@ -69,4 +69,34 @@ ProcessListEntry::~ProcessListEntry()
parent.have_space.signal();
}
void ProcessList::addTemporaryTable(ProcessListElement & elem, const String & table_name, StoragePtr storage)
{
Poco::ScopedLock<Poco::FastMutex> lock(mutex);
elem.temporary_tables[table_name] = storage;
}
StoragePtr ProcessList::tryGetTemporaryTable(const String & query_id, const String & table_name) const
{
Poco::ScopedLock<Poco::FastMutex> lock(mutex);
/// NOTE Ищем по всем user-ам. То есть, нет изоляции, и сложность O(users).
for (const auto & user_queries : user_to_queries)
{
auto it = user_queries.second.find(query_id);
if (user_queries.second.end() == it)
continue;
auto jt = (*it->second).temporary_tables.find(table_name);
if ((*it->second).temporary_tables.end() == jt)
continue;
return jt->second;
}
return {};
}
}

View File

@ -168,7 +168,10 @@ BlockInputStreams StorageDistributed::read(
if (settings.limits.max_network_bandwidth)
throttler.reset(new Throttler(settings.limits.max_network_bandwidth));
Tables external_tables = context.getExternalTables();
Tables external_tables;
if (settings.global_subqueries_method == GlobalSubqueriesMethod::PUSH)
external_tables = context.getExternalTables();
/// Цикл по шардам.
for (auto & conn_pool : cluster.pools)

View File

@ -85,7 +85,7 @@ int main(int argc, char ** argv)
DB::WriteBufferFromOStream out_buf(std::cout);
DB::LimitBlockInputStream in_limit(in, 10);
DB::LimitBlockInputStream in_limit(in, 10, 0);
DB::RowOutputStreamPtr output_ = new DB::TabSeparatedRowOutputStream(out_buf, sample);
DB::BlockOutputStreamFromRowOutputStream output(output_);

View File

@ -206,6 +206,7 @@
31 53574
35 55022
36 53961
1
1 1
3 1
6 1
@ -414,3 +415,4 @@
31 54074
35 54153
36 53999
1

View File

@ -16,6 +16,8 @@ SELECT Y, uniqHLL12(Z) FROM (SELECT number AS X, IPv4NumToString(toUInt32(X)) AS
SELECT Y, uniqHLL12(Z) FROM (SELECT number AS X, IPv4NumToString(toUInt32(X)) AS Z, (3*X*X - 7*X + 11) % 37 AS Y FROM system.numbers LIMIT 3000) GROUP BY Y;
SELECT Y, uniqHLL12(Z) FROM (SELECT number AS X, IPv4NumToString(toUInt32(X)) AS Z, (3*X*X - 7*X + 11) % 37 AS Y FROM system.numbers LIMIT 1000000) GROUP BY Y;
SELECT uniqHLL12(dummy) FROM remote('127.0.0.{1,2}', system.one);
/* uniqCombined */
SELECT Y, uniqCombined(X) FROM (SELECT number AS X, (3*X*X - 7*X + 11) % 37 AS Y FROM system.numbers LIMIT 15) GROUP BY Y;
@ -33,3 +35,5 @@ SELECT Y, uniqCombined(X) FROM (SELECT number AS X, round(toFloat32(1/(1 + (3*X*
SELECT Y, uniqCombined(Z) FROM (SELECT number AS X, IPv4NumToString(toUInt32(X)) AS Z, (3*X*X - 7*X + 11) % 37 AS Y FROM system.numbers LIMIT 15) GROUP BY Y;
SELECT Y, uniqCombined(Z) FROM (SELECT number AS X, IPv4NumToString(toUInt32(X)) AS Z, (3*X*X - 7*X + 11) % 37 AS Y FROM system.numbers LIMIT 3000) GROUP BY Y;
SELECT Y, uniqCombined(Z) FROM (SELECT number AS X, IPv4NumToString(toUInt32(X)) AS Z, (3*X*X - 7*X + 11) % 37 AS Y FROM system.numbers LIMIT 1000000) GROUP BY Y;
SELECT uniqCombined(dummy) FROM remote('127.0.0.{1,2}', system.one);

View File

@ -0,0 +1,6 @@
2 3000
2 3000
1 2000
2 3000
1 2000
2 3000

View File

@ -0,0 +1,6 @@
SELECT a, b FROM (SELECT 1 AS a, 2000 AS b) ANY RIGHT JOIN (SELECT 2 AS a, 3000 AS b) USING a, b;
SELECT a, b FROM (SELECT 1 AS a, 2000 AS b) ANY RIGHT JOIN (SELECT 2 AS a, 3000 AS b) USING b, a;
SELECT a, b FROM (SELECT 1 AS a, 2000 AS b) ANY RIGHT JOIN (SELECT 2 AS a, 3000 AS b UNION ALL SELECT 1 AS a, 2000 AS b) USING a, b;
SELECT a, b FROM (SELECT 1 AS a, 2000 AS b) ANY RIGHT JOIN (SELECT 2 AS a, 3000 AS b UNION ALL SELECT 1 AS a, 2000 AS b) USING b, a;

View File

@ -0,0 +1 @@
SELECT x FROM (SELECT count() AS x FROM remote('127.0.0.1', system.one) WITH TOTALS) LIMIT 1;

View File

@ -0,0 +1,6 @@
2
0 1
0 1
2
0 1
0 1

View File

@ -0,0 +1,6 @@
SET global_subqueries_method = 'push';
SELECT count() FROM remote('127.0.0.{1,2}', system.one) WHERE dummy GLOBAL IN (SELECT 0);
SELECT dummy, x FROM remote('127.0.0.{1,2}', system.one) GLOBAL ANY LEFT JOIN (SELECT 0 AS dummy, 1 AS x) USING dummy;
SET global_subqueries_method = 'pull';
SELECT count() FROM remote('127.0.0.{1,2}', system.one) WHERE dummy GLOBAL IN (SELECT 0);
SELECT dummy, x FROM remote('127.0.0.{1,2}', system.one) GLOBAL ANY LEFT JOIN (SELECT 0 AS dummy, 1 AS x) USING dummy;

View File

@ -0,0 +1,42 @@
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1

View File

@ -0,0 +1,51 @@
drop table if exists sequence_test;
create table sequence_test (time UInt32, data UInt8) engine=Memory;
insert into sequence_test values (0,0),(1,0),(2,0),(3,0),(4,1),(5,2),(6,0),(7,0),(8,0),(9,0),(10,1),(11,1);
select 1 = sequenceMatch('')(toDateTime(time), data = 0, data = 1, data = 2, data = 3) from sequence_test;
select 1 = sequenceMatch('.')(toDateTime(time), data = 0, data = 1, data = 2, data = 3) from sequence_test;
select 1 = sequenceMatch('.*')(toDateTime(time), data = 0, data = 1, data = 2, data = 3) from sequence_test;
select 1 = sequenceMatch('(?1)')(toDateTime(time), data = 0, data = 1, data = 2, data = 3) from sequence_test;
select 1 = sequenceMatch('(?2)')(toDateTime(time), data = 0, data = 1, data = 2, data = 3) from sequence_test;
select 1 = sequenceMatch('(?3)')(toDateTime(time), data = 0, data = 1, data = 2, data = 3) from sequence_test;
select 0 = sequenceMatch('(?4)')(toDateTime(time), data = 0, data = 1, data = 2, data = 3) from sequence_test;
select 1 = sequenceMatch('(?1)(?1)')(toDateTime(time), data = 0, data = 1, data = 2, data = 3) from sequence_test;
select 1 = sequenceMatch('(?1)(?1)(?1)')(toDateTime(time), data = 0, data = 1, data = 2, data = 3) from sequence_test;
select 1 = sequenceMatch('(?1)(?1)(?1)(?1)')(toDateTime(time), data = 0, data = 1, data = 2, data = 3) from sequence_test;
select 0 = sequenceMatch('(?1)(?1)(?1)(?1)(?1)')(toDateTime(time), data = 0, data = 1, data = 2, data = 3) from sequence_test;
select 1 = sequenceMatch('(?1)(?1)(?1)(?1)(?2)')(toDateTime(time), data = 0, data = 1, data = 2, data = 3) from sequence_test;
select 1 = sequenceMatch('(?1)(?t>10)(?2)')(toDateTime(time), data = 0, data = 1, data = 2, data = 3) from sequence_test;
select 0 = sequenceMatch('(?1)(?t>11)(?2)')(toDateTime(time), data = 0, data = 1, data = 2, data = 3) from sequence_test;
select 1 = sequenceMatch('(?1)(?t<11)(?2)')(toDateTime(time), data = 0, data = 1, data = 2, data = 3) from sequence_test;
select 1 = sequenceMatch('(?1)(?t<3)(?3)')(toDateTime(time), data = 0, data = 1, data = 2, data = 3) from sequence_test;
select 1 = sequenceMatch('(?1)(?t<=2)(?3)')(toDateTime(time), data = 0, data = 1, data = 2, data = 3) from sequence_test;
select 0 = sequenceMatch('(?1)(?t<2)(?3)')(toDateTime(time), data = 0, data = 1, data = 2, data = 3) from sequence_test;
select 1 = sequenceMatch('(?2)(?t>=7)(?2)')(toDateTime(time), data = 0, data = 1, data = 2, data = 3) from sequence_test;
select 0 = sequenceMatch('(?2)(?t>7)(?2)')(toDateTime(time), data = 0, data = 1, data = 2, data = 3) from sequence_test;
select 1 = sequenceMatch('(?2)(?3)(?1)')(toDateTime(time), data = 0, data = 1, data = 2, data = 3) from sequence_test;
select count() = sequenceCount('')(toDateTime(time), data = 0, data = 1, data = 2, data = 3) from sequence_test;
select count() = sequenceCount('.')(toDateTime(time), data = 0, data = 1, data = 2, data = 3) from sequence_test;
select count() = sequenceCount('.*')(toDateTime(time), data = 0, data = 1, data = 2, data = 3) from sequence_test;
select 8 = sequenceCount('(?1)')(toDateTime(time), data = 0, data = 1, data = 2, data = 3) from sequence_test;
select 3 = sequenceCount('(?2)')(toDateTime(time), data = 0, data = 1, data = 2, data = 3) from sequence_test;
select 1 = sequenceCount('(?3)')(toDateTime(time), data = 0, data = 1, data = 2, data = 3) from sequence_test;
select 0 = sequenceCount('(?4)')(toDateTime(time), data = 0, data = 1, data = 2, data = 3) from sequence_test;
select 4 = sequenceCount('(?1)(?1)')(toDateTime(time), data = 0, data = 1, data = 2, data = 3) from sequence_test;
select 2 = sequenceCount('(?1)(?1)(?1)')(toDateTime(time), data = 0, data = 1, data = 2, data = 3) from sequence_test;
select 2 = sequenceCount('(?1)(?1)(?1)(?1)')(toDateTime(time), data = 0, data = 1, data = 2, data = 3) from sequence_test;
select 0 = sequenceCount('(?1)(?1)(?1)(?1)(?1)')(toDateTime(time), data = 0, data = 1, data = 2, data = 3) from sequence_test;
select 2 = sequenceCount('(?1)(?1)(?1)(?1)(?2)')(toDateTime(time), data = 0, data = 1, data = 2, data = 3) from sequence_test;
select 1 = sequenceCount('(?1)(?t>10)(?2)')(toDateTime(time), data = 0, data = 1, data = 2, data = 3) from sequence_test;
select 0 = sequenceCount('(?1)(?t>11)(?2)')(toDateTime(time), data = 0, data = 1, data = 2, data = 3) from sequence_test;
select 2 = sequenceCount('(?1)(?t<11)(?2)')(toDateTime(time), data = 0, data = 1, data = 2, data = 3) from sequence_test;
select 1 = sequenceCount('(?1)(?t<3)(?3)')(toDateTime(time), data = 0, data = 1, data = 2, data = 3) from sequence_test;
select 1 = sequenceCount('(?1)(?t<=2)(?3)')(toDateTime(time), data = 0, data = 1, data = 2, data = 3) from sequence_test;
select 0 = sequenceCount('(?1)(?t<2)(?3)')(toDateTime(time), data = 0, data = 1, data = 2, data = 3) from sequence_test;
select 1 = sequenceCount('(?2)(?t>=7)(?2)')(toDateTime(time), data = 0, data = 1, data = 2, data = 3) from sequence_test;
select 0 = sequenceCount('(?2)(?t>7)(?2)')(toDateTime(time), data = 0, data = 1, data = 2, data = 3) from sequence_test;
select 1 = sequenceCount('(?2)(?3)(?1)')(toDateTime(time), data = 0, data = 1, data = 2, data = 3) from sequence_test;
drop table sequence_test;

View File

@ -0,0 +1,70 @@
1 1
1 1
1 1
1 1
1 1
1 1
1 1
1 1
1 1
1 1
1 1
1 1
1 1
1 1
1 1
1 1
1 1
1 1
1 1
1 1
1 1
1 1
1 1
1 1
1 1
1 1
1 1
1 1
1 1
1 1
1 1
1 1
1 1
1 1
1 1
1 1
1 1
1 1
1 1
1 1
1 1
1 1
1 1
1 1
1 1
1 1
1 1
1 1
1 1
1 1
1 1 1
1 1 1
1 1 1
1 1 1
1 1 1
1 1 1
1 1 1
1 1 1
1 1 1
1 1 1
1 1 1
1 1 1
1 1 1
1 1 1
1 1 1
1 1 1
1 1 1
1 1 1
1 1 1
1 1 1

View File

@ -0,0 +1,104 @@
SET max_block_size = 1000;
DROP TABLE IF EXISTS test.numbers_10;
CREATE TABLE test.numbers_10 ENGINE = Memory AS SELECT * FROM system.numbers LIMIT 10000;
SET distributed_aggregation_memory_efficient = 0;
SET group_by_two_level_threshold = 1000;
SELECT sum(c = 1) IN (0, 5), sum(c = 2) IN (5, 10) FROM (SELECT number, count() AS c FROM remote('127.0.0.{1,2}', test.numbers_10) WHERE number < (randConstant() % 2 ? 5 : 10) GROUP BY number);
SELECT sum(c = 1) IN (0, 5), sum(c = 2) IN (5, 10) FROM (SELECT number, count() AS c FROM remote('127.0.0.{1,2}', test.numbers_10) WHERE number < (randConstant() % 2 ? 5 : 10) GROUP BY number);
SELECT sum(c = 1) IN (0, 5), sum(c = 2) IN (5, 10) FROM (SELECT number, count() AS c FROM remote('127.0.0.{1,2}', test.numbers_10) WHERE number < (randConstant() % 2 ? 5 : 10) GROUP BY number);
SELECT sum(c = 1) IN (0, 5), sum(c = 2) IN (5, 10) FROM (SELECT number, count() AS c FROM remote('127.0.0.{1,2}', test.numbers_10) WHERE number < (randConstant() % 2 ? 5 : 10) GROUP BY number);
SELECT sum(c = 1) IN (0, 5), sum(c = 2) IN (5, 10) FROM (SELECT number, count() AS c FROM remote('127.0.0.{1,2}', test.numbers_10) WHERE number < (randConstant() % 2 ? 5 : 10) GROUP BY number);
SELECT sum(c = 1) IN (0, 5), sum(c = 2) IN (5, 10) FROM (SELECT number, count() AS c FROM remote('127.0.0.{1,2}', test.numbers_10) WHERE number < (randConstant() % 2 ? 5 : 10) GROUP BY number);
SELECT sum(c = 1) IN (0, 5), sum(c = 2) IN (5, 10) FROM (SELECT number, count() AS c FROM remote('127.0.0.{1,2}', test.numbers_10) WHERE number < (randConstant() % 2 ? 5 : 10) GROUP BY number);
SELECT sum(c = 1) IN (0, 5), sum(c = 2) IN (5, 10) FROM (SELECT number, count() AS c FROM remote('127.0.0.{1,2}', test.numbers_10) WHERE number < (randConstant() % 2 ? 5 : 10) GROUP BY number);
SELECT sum(c = 1) IN (0, 5), sum(c = 2) IN (5, 10) FROM (SELECT number, count() AS c FROM remote('127.0.0.{1,2}', test.numbers_10) WHERE number < (randConstant() % 2 ? 5 : 10) GROUP BY number);
SELECT sum(c = 1) IN (0, 5), sum(c = 2) IN (5, 10) FROM (SELECT number, count() AS c FROM remote('127.0.0.{1,2}', test.numbers_10) WHERE number < (randConstant() % 2 ? 5 : 10) GROUP BY number);
SET distributed_aggregation_memory_efficient = 0;
SET group_by_two_level_threshold = 7;
SELECT sum(c = 1) IN (0, 5), sum(c = 2) IN (5, 10) FROM (SELECT number, count() AS c FROM remote('127.0.0.{1,2}', test.numbers_10) WHERE number < (randConstant() % 2 ? 5 : 10) GROUP BY number);
SELECT sum(c = 1) IN (0, 5), sum(c = 2) IN (5, 10) FROM (SELECT number, count() AS c FROM remote('127.0.0.{1,2}', test.numbers_10) WHERE number < (randConstant() % 2 ? 5 : 10) GROUP BY number);
SELECT sum(c = 1) IN (0, 5), sum(c = 2) IN (5, 10) FROM (SELECT number, count() AS c FROM remote('127.0.0.{1,2}', test.numbers_10) WHERE number < (randConstant() % 2 ? 5 : 10) GROUP BY number);
SELECT sum(c = 1) IN (0, 5), sum(c = 2) IN (5, 10) FROM (SELECT number, count() AS c FROM remote('127.0.0.{1,2}', test.numbers_10) WHERE number < (randConstant() % 2 ? 5 : 10) GROUP BY number);
SELECT sum(c = 1) IN (0, 5), sum(c = 2) IN (5, 10) FROM (SELECT number, count() AS c FROM remote('127.0.0.{1,2}', test.numbers_10) WHERE number < (randConstant() % 2 ? 5 : 10) GROUP BY number);
SELECT sum(c = 1) IN (0, 5), sum(c = 2) IN (5, 10) FROM (SELECT number, count() AS c FROM remote('127.0.0.{1,2}', test.numbers_10) WHERE number < (randConstant() % 2 ? 5 : 10) GROUP BY number);
SELECT sum(c = 1) IN (0, 5), sum(c = 2) IN (5, 10) FROM (SELECT number, count() AS c FROM remote('127.0.0.{1,2}', test.numbers_10) WHERE number < (randConstant() % 2 ? 5 : 10) GROUP BY number);
SELECT sum(c = 1) IN (0, 5), sum(c = 2) IN (5, 10) FROM (SELECT number, count() AS c FROM remote('127.0.0.{1,2}', test.numbers_10) WHERE number < (randConstant() % 2 ? 5 : 10) GROUP BY number);
SELECT sum(c = 1) IN (0, 5), sum(c = 2) IN (5, 10) FROM (SELECT number, count() AS c FROM remote('127.0.0.{1,2}', test.numbers_10) WHERE number < (randConstant() % 2 ? 5 : 10) GROUP BY number);
SELECT sum(c = 1) IN (0, 5), sum(c = 2) IN (5, 10) FROM (SELECT number, count() AS c FROM remote('127.0.0.{1,2}', test.numbers_10) WHERE number < (randConstant() % 2 ? 5 : 10) GROUP BY number);
SET distributed_aggregation_memory_efficient = 1;
SET group_by_two_level_threshold = 1000;
SELECT sum(c = 1) IN (0, 5), sum(c = 2) IN (5, 10) FROM (SELECT number, count() AS c FROM remote('127.0.0.{1,2}', test.numbers_10) WHERE number < (randConstant() % 2 ? 5 : 10) GROUP BY number);
SELECT sum(c = 1) IN (0, 5), sum(c = 2) IN (5, 10) FROM (SELECT number, count() AS c FROM remote('127.0.0.{1,2}', test.numbers_10) WHERE number < (randConstant() % 2 ? 5 : 10) GROUP BY number);
SELECT sum(c = 1) IN (0, 5), sum(c = 2) IN (5, 10) FROM (SELECT number, count() AS c FROM remote('127.0.0.{1,2}', test.numbers_10) WHERE number < (randConstant() % 2 ? 5 : 10) GROUP BY number);
SELECT sum(c = 1) IN (0, 5), sum(c = 2) IN (5, 10) FROM (SELECT number, count() AS c FROM remote('127.0.0.{1,2}', test.numbers_10) WHERE number < (randConstant() % 2 ? 5 : 10) GROUP BY number);
SELECT sum(c = 1) IN (0, 5), sum(c = 2) IN (5, 10) FROM (SELECT number, count() AS c FROM remote('127.0.0.{1,2}', test.numbers_10) WHERE number < (randConstant() % 2 ? 5 : 10) GROUP BY number);
SELECT sum(c = 1) IN (0, 5), sum(c = 2) IN (5, 10) FROM (SELECT number, count() AS c FROM remote('127.0.0.{1,2}', test.numbers_10) WHERE number < (randConstant() % 2 ? 5 : 10) GROUP BY number);
SELECT sum(c = 1) IN (0, 5), sum(c = 2) IN (5, 10) FROM (SELECT number, count() AS c FROM remote('127.0.0.{1,2}', test.numbers_10) WHERE number < (randConstant() % 2 ? 5 : 10) GROUP BY number);
SELECT sum(c = 1) IN (0, 5), sum(c = 2) IN (5, 10) FROM (SELECT number, count() AS c FROM remote('127.0.0.{1,2}', test.numbers_10) WHERE number < (randConstant() % 2 ? 5 : 10) GROUP BY number);
SELECT sum(c = 1) IN (0, 5), sum(c = 2) IN (5, 10) FROM (SELECT number, count() AS c FROM remote('127.0.0.{1,2}', test.numbers_10) WHERE number < (randConstant() % 2 ? 5 : 10) GROUP BY number);
SELECT sum(c = 1) IN (0, 5), sum(c = 2) IN (5, 10) FROM (SELECT number, count() AS c FROM remote('127.0.0.{1,2}', test.numbers_10) WHERE number < (randConstant() % 2 ? 5 : 10) GROUP BY number);
SET distributed_aggregation_memory_efficient = 1;
SET group_by_two_level_threshold = 7;
SELECT sum(c = 1) IN (0, 5), sum(c = 2) IN (5, 10) FROM (SELECT number, count() AS c FROM remote('127.0.0.{1,2}', test.numbers_10) WHERE number < (randConstant() % 2 ? 5 : 10) GROUP BY number);
SELECT sum(c = 1) IN (0, 5), sum(c = 2) IN (5, 10) FROM (SELECT number, count() AS c FROM remote('127.0.0.{1,2}', test.numbers_10) WHERE number < (randConstant() % 2 ? 5 : 10) GROUP BY number);
SELECT sum(c = 1) IN (0, 5), sum(c = 2) IN (5, 10) FROM (SELECT number, count() AS c FROM remote('127.0.0.{1,2}', test.numbers_10) WHERE number < (randConstant() % 2 ? 5 : 10) GROUP BY number);
SELECT sum(c = 1) IN (0, 5), sum(c = 2) IN (5, 10) FROM (SELECT number, count() AS c FROM remote('127.0.0.{1,2}', test.numbers_10) WHERE number < (randConstant() % 2 ? 5 : 10) GROUP BY number);
SELECT sum(c = 1) IN (0, 5), sum(c = 2) IN (5, 10) FROM (SELECT number, count() AS c FROM remote('127.0.0.{1,2}', test.numbers_10) WHERE number < (randConstant() % 2 ? 5 : 10) GROUP BY number);
SELECT sum(c = 1) IN (0, 5), sum(c = 2) IN (5, 10) FROM (SELECT number, count() AS c FROM remote('127.0.0.{1,2}', test.numbers_10) WHERE number < (randConstant() % 2 ? 5 : 10) GROUP BY number);
SELECT sum(c = 1) IN (0, 5), sum(c = 2) IN (5, 10) FROM (SELECT number, count() AS c FROM remote('127.0.0.{1,2}', test.numbers_10) WHERE number < (randConstant() % 2 ? 5 : 10) GROUP BY number);
SELECT sum(c = 1) IN (0, 5), sum(c = 2) IN (5, 10) FROM (SELECT number, count() AS c FROM remote('127.0.0.{1,2}', test.numbers_10) WHERE number < (randConstant() % 2 ? 5 : 10) GROUP BY number);
SELECT sum(c = 1) IN (0, 5), sum(c = 2) IN (5, 10) FROM (SELECT number, count() AS c FROM remote('127.0.0.{1,2}', test.numbers_10) WHERE number < (randConstant() % 2 ? 5 : 10) GROUP BY number);
SELECT sum(c = 1) IN (0, 5), sum(c = 2) IN (5, 10) FROM (SELECT number, count() AS c FROM remote('127.0.0.{1,2}', test.numbers_10) WHERE number < (randConstant() % 2 ? 5 : 10) GROUP BY number);
SET distributed_aggregation_memory_efficient = 1;
SET group_by_two_level_threshold = 1;
SELECT sum(c = 1) IN (0, 5), sum(c = 2) IN (5, 10) FROM (SELECT number, count() AS c FROM remote('127.0.0.{1,2}', test.numbers_10) WHERE number < (randConstant() % 2 ? 5 : 10) GROUP BY number);
SELECT sum(c = 1) IN (0, 5), sum(c = 2) IN (5, 10) FROM (SELECT number, count() AS c FROM remote('127.0.0.{1,2}', test.numbers_10) WHERE number < (randConstant() % 2 ? 5 : 10) GROUP BY number);
SELECT sum(c = 1) IN (0, 5), sum(c = 2) IN (5, 10) FROM (SELECT number, count() AS c FROM remote('127.0.0.{1,2}', test.numbers_10) WHERE number < (randConstant() % 2 ? 5 : 10) GROUP BY number);
SELECT sum(c = 1) IN (0, 5), sum(c = 2) IN (5, 10) FROM (SELECT number, count() AS c FROM remote('127.0.0.{1,2}', test.numbers_10) WHERE number < (randConstant() % 2 ? 5 : 10) GROUP BY number);
SELECT sum(c = 1) IN (0, 5), sum(c = 2) IN (5, 10) FROM (SELECT number, count() AS c FROM remote('127.0.0.{1,2}', test.numbers_10) WHERE number < (randConstant() % 2 ? 5 : 10) GROUP BY number);
SELECT sum(c = 1) IN (0, 5), sum(c = 2) IN (5, 10) FROM (SELECT number, count() AS c FROM remote('127.0.0.{1,2}', test.numbers_10) WHERE number < (randConstant() % 2 ? 5 : 10) GROUP BY number);
SELECT sum(c = 1) IN (0, 5), sum(c = 2) IN (5, 10) FROM (SELECT number, count() AS c FROM remote('127.0.0.{1,2}', test.numbers_10) WHERE number < (randConstant() % 2 ? 5 : 10) GROUP BY number);
SELECT sum(c = 1) IN (0, 5), sum(c = 2) IN (5, 10) FROM (SELECT number, count() AS c FROM remote('127.0.0.{1,2}', test.numbers_10) WHERE number < (randConstant() % 2 ? 5 : 10) GROUP BY number);
SELECT sum(c = 1) IN (0, 5), sum(c = 2) IN (5, 10) FROM (SELECT number, count() AS c FROM remote('127.0.0.{1,2}', test.numbers_10) WHERE number < (randConstant() % 2 ? 5 : 10) GROUP BY number);
SELECT sum(c = 1) IN (0, 5), sum(c = 2) IN (5, 10) FROM (SELECT number, count() AS c FROM remote('127.0.0.{1,2}', test.numbers_10) WHERE number < (randConstant() % 2 ? 5 : 10) GROUP BY number);
SET distributed_aggregation_memory_efficient = 1;
SET group_by_two_level_threshold = 1000;
SELECT sum(c = 1) IN (0, 10), sum(c = 2) IN (0, 5), sum(c) = 10 FROM (SELECT number, count() AS c FROM remote('127.0.0.{1,2}', test.numbers_10) WHERE number < (randConstant() % 2 ? 5 : 10) AND number >= (randConstant() % 2 ? 0 : 5) GROUP BY number);
SELECT sum(c = 1) IN (0, 10), sum(c = 2) IN (0, 5), sum(c) = 10 FROM (SELECT number, count() AS c FROM remote('127.0.0.{1,2}', test.numbers_10) WHERE number < (randConstant() % 2 ? 5 : 10) AND number >= (randConstant() % 2 ? 0 : 5) GROUP BY number);
SELECT sum(c = 1) IN (0, 10), sum(c = 2) IN (0, 5), sum(c) = 10 FROM (SELECT number, count() AS c FROM remote('127.0.0.{1,2}', test.numbers_10) WHERE number < (randConstant() % 2 ? 5 : 10) AND number >= (randConstant() % 2 ? 0 : 5) GROUP BY number);
SELECT sum(c = 1) IN (0, 10), sum(c = 2) IN (0, 5), sum(c) = 10 FROM (SELECT number, count() AS c FROM remote('127.0.0.{1,2}', test.numbers_10) WHERE number < (randConstant() % 2 ? 5 : 10) AND number >= (randConstant() % 2 ? 0 : 5) GROUP BY number);
SELECT sum(c = 1) IN (0, 10), sum(c = 2) IN (0, 5), sum(c) = 10 FROM (SELECT number, count() AS c FROM remote('127.0.0.{1,2}', test.numbers_10) WHERE number < (randConstant() % 2 ? 5 : 10) AND number >= (randConstant() % 2 ? 0 : 5) GROUP BY number);
SELECT sum(c = 1) IN (0, 10), sum(c = 2) IN (0, 5), sum(c) = 10 FROM (SELECT number, count() AS c FROM remote('127.0.0.{1,2}', test.numbers_10) WHERE number < (randConstant() % 2 ? 5 : 10) AND number >= (randConstant() % 2 ? 0 : 5) GROUP BY number);
SELECT sum(c = 1) IN (0, 10), sum(c = 2) IN (0, 5), sum(c) = 10 FROM (SELECT number, count() AS c FROM remote('127.0.0.{1,2}', test.numbers_10) WHERE number < (randConstant() % 2 ? 5 : 10) AND number >= (randConstant() % 2 ? 0 : 5) GROUP BY number);
SELECT sum(c = 1) IN (0, 10), sum(c = 2) IN (0, 5), sum(c) = 10 FROM (SELECT number, count() AS c FROM remote('127.0.0.{1,2}', test.numbers_10) WHERE number < (randConstant() % 2 ? 5 : 10) AND number >= (randConstant() % 2 ? 0 : 5) GROUP BY number);
SELECT sum(c = 1) IN (0, 10), sum(c = 2) IN (0, 5), sum(c) = 10 FROM (SELECT number, count() AS c FROM remote('127.0.0.{1,2}', test.numbers_10) WHERE number < (randConstant() % 2 ? 5 : 10) AND number >= (randConstant() % 2 ? 0 : 5) GROUP BY number);
SELECT sum(c = 1) IN (0, 10), sum(c = 2) IN (0, 5), sum(c) = 10 FROM (SELECT number, count() AS c FROM remote('127.0.0.{1,2}', test.numbers_10) WHERE number < (randConstant() % 2 ? 5 : 10) AND number >= (randConstant() % 2 ? 0 : 5) GROUP BY number);
SET distributed_aggregation_memory_efficient = 1;
SET group_by_two_level_threshold = 1;
SELECT sum(c = 1) IN (0, 10), sum(c = 2) IN (0, 5), sum(c) = 10 FROM (SELECT number, count() AS c FROM remote('127.0.0.{1,2}', test.numbers_10) WHERE number < (randConstant() % 2 ? 5 : 10) AND number >= (randConstant() % 2 ? 0 : 5) GROUP BY number);
SELECT sum(c = 1) IN (0, 10), sum(c = 2) IN (0, 5), sum(c) = 10 FROM (SELECT number, count() AS c FROM remote('127.0.0.{1,2}', test.numbers_10) WHERE number < (randConstant() % 2 ? 5 : 10) AND number >= (randConstant() % 2 ? 0 : 5) GROUP BY number);
SELECT sum(c = 1) IN (0, 10), sum(c = 2) IN (0, 5), sum(c) = 10 FROM (SELECT number, count() AS c FROM remote('127.0.0.{1,2}', test.numbers_10) WHERE number < (randConstant() % 2 ? 5 : 10) AND number >= (randConstant() % 2 ? 0 : 5) GROUP BY number);
SELECT sum(c = 1) IN (0, 10), sum(c = 2) IN (0, 5), sum(c) = 10 FROM (SELECT number, count() AS c FROM remote('127.0.0.{1,2}', test.numbers_10) WHERE number < (randConstant() % 2 ? 5 : 10) AND number >= (randConstant() % 2 ? 0 : 5) GROUP BY number);
SELECT sum(c = 1) IN (0, 10), sum(c = 2) IN (0, 5), sum(c) = 10 FROM (SELECT number, count() AS c FROM remote('127.0.0.{1,2}', test.numbers_10) WHERE number < (randConstant() % 2 ? 5 : 10) AND number >= (randConstant() % 2 ? 0 : 5) GROUP BY number);
SELECT sum(c = 1) IN (0, 10), sum(c = 2) IN (0, 5), sum(c) = 10 FROM (SELECT number, count() AS c FROM remote('127.0.0.{1,2}', test.numbers_10) WHERE number < (randConstant() % 2 ? 5 : 10) AND number >= (randConstant() % 2 ? 0 : 5) GROUP BY number);
SELECT sum(c = 1) IN (0, 10), sum(c = 2) IN (0, 5), sum(c) = 10 FROM (SELECT number, count() AS c FROM remote('127.0.0.{1,2}', test.numbers_10) WHERE number < (randConstant() % 2 ? 5 : 10) AND number >= (randConstant() % 2 ? 0 : 5) GROUP BY number);
SELECT sum(c = 1) IN (0, 10), sum(c = 2) IN (0, 5), sum(c) = 10 FROM (SELECT number, count() AS c FROM remote('127.0.0.{1,2}', test.numbers_10) WHERE number < (randConstant() % 2 ? 5 : 10) AND number >= (randConstant() % 2 ? 0 : 5) GROUP BY number);
SELECT sum(c = 1) IN (0, 10), sum(c = 2) IN (0, 5), sum(c) = 10 FROM (SELECT number, count() AS c FROM remote('127.0.0.{1,2}', test.numbers_10) WHERE number < (randConstant() % 2 ? 5 : 10) AND number >= (randConstant() % 2 ? 0 : 5) GROUP BY number);
SELECT sum(c = 1) IN (0, 10), sum(c = 2) IN (0, 5), sum(c) = 10 FROM (SELECT number, count() AS c FROM remote('127.0.0.{1,2}', test.numbers_10) WHERE number < (randConstant() % 2 ? 5 : 10) AND number >= (randConstant() % 2 ? 0 : 5) GROUP BY number);
DROP TABLE test.numbers_10;

View File

@ -0,0 +1,24 @@
0 2
1 2
2 2
3 2
4 2
5 2
6 2
7 2
8 2
9 2
0 200000
0 2
1 2
2 2
3 2
4 2
5 2
6 2
7 2
8 2
9 2
0 200000

View File

@ -0,0 +1,14 @@
DROP TABLE IF EXISTS test.numbers_100k_log;
CREATE TABLE test.numbers_100k_log ENGINE = Log AS SELECT * FROM system.numbers LIMIT 100000;
SELECT number, count() FROM remote('127.0.0.{1,2}', test.numbers_100k_log) GROUP BY number WITH TOTALS ORDER BY number LIMIT 10;
SET distributed_aggregation_memory_efficient = 1,
group_by_two_level_threshold = 1000,
group_by_overflow_mode = 'any',
max_rows_to_group_by = 1000,
totals_mode = 'after_having_auto';
SELECT number, count() FROM remote('127.0.0.{1,2}', test.numbers_100k_log) GROUP BY number WITH TOTALS ORDER BY number LIMIT 10;
DROP TABLE test.numbers_100k_log;