This commit is contained in:
Alexey Arno 2015-07-30 20:09:02 +03:00
commit f651046ac8
62 changed files with 1756 additions and 403 deletions

View File

@ -176,6 +176,7 @@ public:
/** Размножить все значения столько раз, сколько прописано в offsets.
* (i-е значение размножается в offsets[i] - offsets[i - 1] значений.)
* Необходимо для реализации операции ARRAY JOIN.
*/
typedef UInt64 Offset_t;
typedef PODArray<Offset_t> Offsets_t;

View File

@ -1,51 +1,17 @@
#pragma once
#include <DB/Common/HashTable/HashMap.h>
/** Хеш-таблица, позволяющая очищать таблицу за O(1).
* Еще более простая, чем HashMap: Key и Mapped должны быть POD-типами.
*
* Вместо этого класса можно было бы просто использовать в HashMap в качестве ключа пару <версия, ключ>,
* но тогда таблица накапливала бы все ключи, которые в нее когда-либо складывали, и неоправданно росла.
* Этот класс идет на шаг дальше и считает ключи со старой версией пустыми местами в хеш-таблице.
*/
struct ClearableHashMapState
{
UInt32 version = 1;
/// Сериализация, в бинарном и текстовом виде.
void write(DB::WriteBuffer & wb) const { DB::writeBinary(version, wb); }
void writeText(DB::WriteBuffer & wb) const { DB::writeText(version, wb); }
/// Десериализация, в бинарном и текстовом виде.
void read(DB::ReadBuffer & rb) { DB::readBinary(version, rb); }
void readText(DB::ReadBuffer & rb) { DB::readText(version, rb); }
};
#include <DB/Common/HashTable/ClearableHashSet.h>
template <typename Key, typename Mapped, typename Hash>
struct ClearableHashMapCell : public HashMapCell<Key, Mapped, Hash, ClearableHashMapState>
struct ClearableHashMapCell : public ClearableHashTableCell<Key, HashMapCell<Key, Mapped, Hash, ClearableHashSetState>>
{
typedef ClearableHashMapState State;
typedef HashMapCell<Key, Mapped, Hash, ClearableHashMapState> Base;
typedef typename Base::value_type value_type;
using Base = ClearableHashTableCell<Key, HashMapCell<Key, Mapped, Hash, ClearableHashSetState>>;
using Base::Base;
UInt32 version;
ClearableHashMapCell() {}
ClearableHashMapCell(const Key & key_, const State & state) : Base(key_, state), version(state.version) {}
ClearableHashMapCell(const value_type & value_, const State & state) : Base(value_, state), version(state.version) {}
bool isZero(const State & state) const { return version != state.version; }
static bool isZero(const Key & key, const State & state) { return false; }
/// Установить значение ключа в ноль.
void setZero() { version = 0; }
/// Нужно ли хранить нулевой ключ отдельно (то есть, могут ли в хэш-таблицу вставить нулевой ключ).
static constexpr bool need_zero_value_storage = false;
ClearableHashMapCell(const typename Base::value_type & value_, const typename Base::State & state)
: Base::BaseCell(value_, state), Base::version(state.version) {}
};

View File

@ -0,0 +1,70 @@
#pragma once
#include <type_traits>
#include <DB/Common/HashTable/HashSet.h>
/** Хеш-таблица, позволяющая очищать таблицу за O(1).
* Еще более простая, чем HashSet: Key и Mapped должны быть POD-типами.
*
* Вместо этого класса можно было бы просто использовать в HashSet в качестве ключа пару <версия, ключ>,
* но тогда таблица накапливала бы все ключи, которые в нее когда-либо складывали, и неоправданно росла.
* Этот класс идет на шаг дальше и считает ключи со старой версией пустыми местами в хеш-таблице.
*/
struct ClearableHashSetState
{
UInt32 version = 1;
/// Сериализация, в бинарном и текстовом виде.
void write(DB::WriteBuffer & wb) const { DB::writeBinary(version, wb); }
void writeText(DB::WriteBuffer & wb) const { DB::writeText(version, wb); }
/// Десериализация, в бинарном и текстовом виде.
void read(DB::ReadBuffer & rb) { DB::readBinary(version, rb); }
void readText(DB::ReadBuffer & rb) { DB::readText(version, rb); }
};
template <typename Key, typename BaseCell>
struct ClearableHashTableCell : public BaseCell
{
typedef ClearableHashSetState State;
typedef typename BaseCell::value_type value_type;
UInt32 version;
bool isZero(const State & state) const { return version != state.version; }
static bool isZero(const Key & key, const State & state) { return false; }
/// Установить значение ключа в ноль.
void setZero() { version = 0; }
/// Нужно ли хранить нулевой ключ отдельно (то есть, могут ли в хэш-таблицу вставить нулевой ключ).
static constexpr bool need_zero_value_storage = false;
ClearableHashTableCell() {}
ClearableHashTableCell(const Key & key_, const State & state) : BaseCell(key_, state), version(state.version) {}
};
template
<
typename Key,
typename Hash = DefaultHash<Key>,
typename Grower = HashTableGrower<>,
typename Allocator = HashTableAllocator
>
class ClearableHashSet : public HashTable<Key, ClearableHashTableCell<Key, HashTableCell<Key, Hash, ClearableHashSetState>>, Hash, Grower, Allocator>
{
public:
typedef Key key_type;
typedef typename ClearableHashSet::cell_type::value_type value_type;
void clear()
{
++this->version;
this->m_size = 0;
}
};

View File

@ -102,9 +102,7 @@ struct HashMapCellWithSavedHash : public HashMapCell<Key, TMapped, Hash, TState>
size_t saved_hash;
HashMapCellWithSavedHash() : Base() {}
HashMapCellWithSavedHash(const Key & key_, const typename Base::State & state) : Base(key_, state) {}
HashMapCellWithSavedHash(const typename Base::value_type & value_, const typename Base::State & state) : Base(value_, state) {}
using Base::Base;
bool keyEquals(const Key & key_) const { return this->value.first == key_; }
bool keyEquals(const Key & key_, size_t hash_) const { return saved_hash == hash_ && this->value.first == key_; }

View File

@ -14,10 +14,10 @@ class FormatFactory
{
public:
BlockInputStreamPtr getInput(const String & name, ReadBuffer & buf,
Block & sample, size_t max_block_size) const;
const Block & sample, size_t max_block_size) const;
BlockOutputStreamPtr getOutput(const String & name, WriteBuffer & buf,
Block & sample) const;
const Block & sample) const;
};
}

View File

@ -21,17 +21,21 @@ class ParallelAggregatingBlockInputStream : public IProfilingBlockInputStream
public:
/** Столбцы из key_names и аргументы агрегатных функций, уже должны быть вычислены.
*/
ParallelAggregatingBlockInputStream(BlockInputStreams inputs, const Names & key_names,
const AggregateDescriptions & aggregates, bool overflow_row_, bool final_, size_t max_threads_,
ParallelAggregatingBlockInputStream(
BlockInputStreams inputs, BlockInputStreamPtr additional_input_at_end,
const Names & key_names, const AggregateDescriptions & aggregates,
bool overflow_row_, bool final_, size_t max_threads_,
size_t max_rows_to_group_by_, OverflowMode group_by_overflow_mode_,
Compiler * compiler_, UInt32 min_count_to_compile_, size_t group_by_two_level_threshold_)
: aggregator(key_names, aggregates, overflow_row_, max_rows_to_group_by_, group_by_overflow_mode_,
compiler_, min_count_to_compile_, group_by_two_level_threshold_),
final(final_), max_threads(std::min(inputs.size(), max_threads_)),
keys_size(aggregator.getNumberOfKeys()), aggregates_size(aggregator.getNumberOfAggregates()),
handler(*this), processor(inputs, max_threads, handler)
handler(*this), processor(inputs, additional_input_at_end, max_threads, handler)
{
children.insert(children.end(), inputs.begin(), inputs.end());
children = inputs;
if (additional_input_at_end)
children.push_back(additional_input_at_end);
}
String getName() const override { return "ParallelAggregating"; }

View File

@ -45,8 +45,16 @@ template <typename Handler>
class ParallelInputsProcessor
{
public:
ParallelInputsProcessor(BlockInputStreams inputs_, size_t max_threads_, Handler & handler_)
: inputs(inputs_), max_threads(std::min(inputs_.size(), max_threads_)), handler(handler_)
/** additional_input_at_end - если не nullptr,
* то из этого источника начинают доставаться блоки лишь после того, как все остальные источники обработаны.
* Это делается в основном потоке.
*
* Предназначено для реализации FULL и RIGHT JOIN
* - где нужно сначала параллельно сделать JOIN, при этом отмечая, какие ключи не найдены,
* и только после завершения этой работы, создать блоки из ненайденных ключей.
*/
ParallelInputsProcessor(BlockInputStreams inputs_, BlockInputStreamPtr additional_input_at_end_, size_t max_threads_, Handler & handler_)
: inputs(inputs_), additional_input_at_end(additional_input_at_end_), max_threads(std::min(inputs_.size(), max_threads_)), handler(handler_)
{
for (size_t i = 0; i < inputs_.size(); ++i)
available_inputs.emplace(inputs_[i], i);
@ -150,6 +158,25 @@ private:
/// Последний поток при выходе сообщает, что данных больше нет.
if (0 == --active_threads)
{
/// И ещё обрабатывает дополнительный источник, если такой есть.
if (additional_input_at_end)
{
try
{
while (Block block = additional_input_at_end->read())
handler.onBlock(block, thread_num);
}
catch (...)
{
exception = cloneCurrentException();
}
if (exception)
{
handler.onException(exception, thread_num);
}
}
handler.onFinish();
}
}
@ -206,6 +233,7 @@ private:
}
BlockInputStreams inputs;
BlockInputStreamPtr additional_input_at_end;
unsigned max_threads;
Handler & handler;

View File

@ -84,6 +84,16 @@ public:
if (!is_cancelled.compare_exchange_strong(old_val, true, std::memory_order_seq_cst, std::memory_order_relaxed))
return;
{
std::lock_guard<std::mutex> lock(external_tables_mutex);
/// Останавливаем отправку внешних данных.
for (auto & vec : external_tables_data)
for (auto & elem : vec)
if (IProfilingBlockInputStream * stream = dynamic_cast<IProfilingBlockInputStream *>(elem.first.get()))
stream->cancel();
}
if (!isQueryPending() || hasThrownException())
return;
@ -107,27 +117,30 @@ protected:
{
size_t count = parallel_replicas->size();
std::vector<ExternalTablesData> instances;
instances.reserve(count);
for (size_t i = 0; i < count; ++i)
{
ExternalTablesData res;
for (const auto & table : external_tables)
std::lock_guard<std::mutex> lock(external_tables_mutex);
external_tables_data.reserve(count);
for (size_t i = 0; i < count; ++i)
{
StoragePtr cur = table.second;
QueryProcessingStage::Enum stage = QueryProcessingStage::Complete;
DB::BlockInputStreams input = cur->read(cur->getColumnNamesList(), ASTPtr(), context, settings,
stage, DEFAULT_BLOCK_SIZE, 1);
if (input.size() == 0)
res.push_back(std::make_pair(new OneBlockInputStream(cur->getSampleBlock()), table.first));
else
res.push_back(std::make_pair(input[0], table.first));
ExternalTablesData res;
for (const auto & table : external_tables)
{
StoragePtr cur = table.second;
QueryProcessingStage::Enum stage = QueryProcessingStage::Complete;
DB::BlockInputStreams input = cur->read(cur->getColumnNamesList(), ASTPtr(), context, settings,
stage, DEFAULT_BLOCK_SIZE, 1);
if (input.size() == 0)
res.push_back(std::make_pair(new OneBlockInputStream(cur->getSampleBlock()), table.first));
else
res.push_back(std::make_pair(input[0], table.first));
}
external_tables_data.push_back(std::move(res));
}
instances.push_back(std::move(res));
}
parallel_replicas->sendExternalTablesData(instances);
parallel_replicas->sendExternalTablesData(external_tables_data);
}
Block readImpl() override
@ -302,6 +315,10 @@ private:
QueryProcessingStage::Enum stage;
Context context;
/// Потоки для чтения из временных таблиц - для последующей отправки данных на удалённые серверы для GLOBAL-подзапросов.
std::vector<ExternalTablesData> external_tables_data;
std::mutex external_tables_mutex;
/// Установили соединения с репликами, но ещё не отправили запрос.
std::atomic<bool> established { false };

View File

@ -28,12 +28,14 @@ using Poco::SharedPtr;
class UnionBlockInputStream : public IProfilingBlockInputStream
{
public:
UnionBlockInputStream(BlockInputStreams inputs, size_t max_threads) :
UnionBlockInputStream(BlockInputStreams inputs, BlockInputStreamPtr additional_input_at_end, size_t max_threads) :
output_queue(std::min(inputs.size(), max_threads)),
handler(*this),
processor(inputs, max_threads, handler)
processor(inputs, additional_input_at_end, max_threads, handler)
{
children = inputs;
if (additional_input_at_end)
children.push_back(additional_input_at_end);
}
String getName() const override { return "Union"; }

View File

@ -33,14 +33,21 @@ namespace DB
* has(arr, x) - есть ли в массиве элемент x.
* indexOf(arr, x) - возвращает индекс элемента x (начиная с 1), если он есть в массиве, или 0, если его нет.
* arrayEnumerate(arr) - возаращает массив [1,2,3,..., length(arr)]
* arrayEnumerateUniq(arr) - возаращает массив, параллельный данному, где для каждого элемента указано,
* какой он по счету среди элементов с таким значением.
* Например: arrayEnumerateUniq([10, 20, 10, 30]) = [1, 1, 2, 1]
*
* arrayUniq(arr) - считает количество разных элементов в массиве,
* arrayUniq(arr1, arr2, ...) - считает количество разных кортежей из элементов на соответствующих позициях в нескольких массивах.
*
* arrayEnumerateUniq(arr)
* - возаращает массив, параллельный данному, где для каждого элемента указано,
* какой он по счету среди элементов с таким значением.
* Например: arrayEnumerateUniq([10, 20, 10, 30]) = [1, 1, 2, 1]
* arrayEnumerateUniq(arr1, arr2...)
* - для кортежей из элементов на соответствующих позициях в нескольких массивах.
*
* emptyArrayToSingle(arr) - заменить пустые массивы на массивы из одного элемента со значением "по-умолчанию".
*/
class FunctionArray : public IFunction
{
public:
@ -995,11 +1002,11 @@ public:
{
const ColumnArray::Offsets_t & offsets = array->getOffsets();
ColumnVector<UInt32> * res_nested = new ColumnVector<UInt32>;
ColumnUInt32 * res_nested = new ColumnUInt32;
ColumnArray * res_array = new ColumnArray(res_nested, array->getOffsetsColumn());
block.getByPosition(result).column = res_array;
ColumnVector<UInt32>::Container_t & res_values = res_nested->getData();
ColumnUInt32::Container_t & res_values = res_nested->getData();
res_values.resize(array->getData().size());
size_t prev_off = 0;
for (size_t i = 0; i < offsets.size(); ++i)
@ -1034,6 +1041,239 @@ public:
}
};
/// Считает количество разных элементов в массиве, или количество разных кортежей из элементов на соответствующих позициях в нескольких массивах.
/// NOTE Реализация частично совпадает с arrayEnumerateUniq.
class FunctionArrayUniq : public IFunction
{
public:
static constexpr auto name = "arrayUniq";
static IFunction * create(const Context & context) { return new FunctionArrayUniq; }
/// Получить имя функции.
String getName() const
{
return name;
}
/// Получить типы результата по типам аргументов. Если функция неприменима для данных аргументов - кинуть исключение.
DataTypePtr getReturnType(const DataTypes & arguments) const
{
if (arguments.size() == 0)
throw Exception("Number of arguments for function " + getName() + " doesn't match: passed "
+ toString(arguments.size()) + ", should be at least 1.",
ErrorCodes::NUMBER_OF_ARGUMENTS_DOESNT_MATCH);
for (size_t i = 0; i < arguments.size(); ++i)
{
const DataTypeArray * array_type = typeid_cast<const DataTypeArray *>(&*arguments[i]);
if (!array_type)
throw Exception("All arguments for function " + getName() + " must be arrays; argument " + toString(i + 1) + " isn't.",
ErrorCodes::ILLEGAL_TYPE_OF_ARGUMENT);
}
return new DataTypeUInt32;
}
/// Выполнить функцию над блоком.
void execute(Block & block, const ColumnNumbers & arguments, size_t result)
{
if (arguments.size() == 1 && executeConst(block, arguments, result))
return;
Columns array_columns(arguments.size());
const ColumnArray::Offsets_t * offsets = nullptr;
ConstColumnPlainPtrs data_columns(arguments.size());
for (size_t i = 0; i < arguments.size(); ++i)
{
ColumnPtr array_ptr = block.getByPosition(arguments[i]).column;
const ColumnArray * array = typeid_cast<const ColumnArray *>(&*array_ptr);
if (!array)
{
const ColumnConstArray * const_array = typeid_cast<const ColumnConstArray *>(&*block.getByPosition(arguments[i]).column);
if (!const_array)
throw Exception("Illegal column " + block.getByPosition(arguments[i]).column->getName()
+ " of " + toString(i + 1) + "-th argument of function " + getName(),
ErrorCodes::ILLEGAL_COLUMN);
array_ptr = const_array->convertToFullColumn();
array = typeid_cast<const ColumnArray *>(&*array_ptr);
}
array_columns[i] = array_ptr;
const ColumnArray::Offsets_t & offsets_i = array->getOffsets();
if (!i)
offsets = &offsets_i;
else if (offsets_i != *offsets)
throw Exception("Lengths of all arrays passsed to " + getName() + " must be equal.",
ErrorCodes::SIZES_OF_ARRAYS_DOESNT_MATCH);
data_columns[i] = &array->getData();
}
const ColumnArray * first_array = typeid_cast<const ColumnArray *>(&*array_columns[0]);
ColumnUInt32 * res = new ColumnUInt32;
block.getByPosition(result).column = res;
ColumnUInt32::Container_t & res_values = res->getData();
res_values.resize(offsets->size());
if (arguments.size() == 1)
{
if (!( executeNumber<UInt8> (first_array, res_values)
|| executeNumber<UInt16> (first_array, res_values)
|| executeNumber<UInt32> (first_array, res_values)
|| executeNumber<UInt64> (first_array, res_values)
|| executeNumber<Int8> (first_array, res_values)
|| executeNumber<Int16> (first_array, res_values)
|| executeNumber<Int32> (first_array, res_values)
|| executeNumber<Int64> (first_array, res_values)
|| executeNumber<Float32> (first_array, res_values)
|| executeNumber<Float64> (first_array, res_values)
|| executeString (first_array, res_values)))
throw Exception("Illegal column " + block.getByPosition(arguments[0]).column->getName()
+ " of first argument of function " + getName(),
ErrorCodes::ILLEGAL_COLUMN);
}
else
{
if (!execute128bit(*offsets, data_columns, res_values))
executeHashed(*offsets, data_columns, res_values);
}
}
private:
/// Изначально выделить кусок памяти для 512 элементов.
static constexpr size_t INITIAL_SIZE_DEGREE = 9;
template <typename T>
bool executeNumber(const ColumnArray * array, ColumnUInt32::Container_t & res_values)
{
const ColumnVector<T> * nested = typeid_cast<const ColumnVector<T> *>(&array->getData());
if (!nested)
return false;
const ColumnArray::Offsets_t & offsets = array->getOffsets();
const typename ColumnVector<T>::Container_t & values = nested->getData();
typedef ClearableHashSet<T, DefaultHash<T>, HashTableGrower<INITIAL_SIZE_DEGREE>,
HashTableAllocatorWithStackMemory<(1 << INITIAL_SIZE_DEGREE) * sizeof(T)> > Set;
Set set;
size_t prev_off = 0;
for (size_t i = 0; i < offsets.size(); ++i)
{
set.clear();
size_t off = offsets[i];
for (size_t j = prev_off; j < off; ++j)
set.insert(values[j]);
res_values[i] = set.size();
prev_off = off;
}
return true;
}
bool executeString(const ColumnArray * array, ColumnUInt32::Container_t & res_values)
{
const ColumnString * nested = typeid_cast<const ColumnString *>(&array->getData());
if (!nested)
return false;
const ColumnArray::Offsets_t & offsets = array->getOffsets();
typedef ClearableHashSet<StringRef, StringRefHash, HashTableGrower<INITIAL_SIZE_DEGREE>,
HashTableAllocatorWithStackMemory<(1 << INITIAL_SIZE_DEGREE) * sizeof(StringRef)> > Set;
Set set;
size_t prev_off = 0;
for (size_t i = 0; i < offsets.size(); ++i)
{
set.clear();
size_t off = offsets[i];
for (size_t j = prev_off; j < off; ++j)
set.insert(nested->getDataAt(j));
res_values[i] = set.size();
prev_off = off;
}
return true;
}
bool executeConst(Block & block, const ColumnNumbers & arguments, size_t result)
{
const ColumnConstArray * array = typeid_cast<const ColumnConstArray *>(&*block.getByPosition(arguments[0]).column);
if (!array)
return false;
const Array & values = array->getData();
std::set<Field> set;
for (size_t i = 0; i < values.size(); ++i)
set.insert(values[i]);
block.getByPosition(result).column = new ColumnConstUInt32(array->size(), set.size());
return true;
}
bool execute128bit(
const ColumnArray::Offsets_t & offsets,
const ConstColumnPlainPtrs & columns,
ColumnUInt32::Container_t & res_values)
{
size_t count = columns.size();
size_t keys_bytes = 0;
Sizes key_sizes(count);
for (size_t j = 0; j < count; ++j)
{
if (!columns[j]->isFixed())
return false;
key_sizes[j] = columns[j]->sizeOfField();
keys_bytes += key_sizes[j];
}
if (keys_bytes > 16)
return false;
typedef ClearableHashSet<UInt128, UInt128HashCRC32, HashTableGrower<INITIAL_SIZE_DEGREE>,
HashTableAllocatorWithStackMemory<(1 << INITIAL_SIZE_DEGREE) * sizeof(UInt128)> > Set;
Set set;
size_t prev_off = 0;
for (size_t i = 0; i < offsets.size(); ++i)
{
set.clear();
size_t off = offsets[i];
for (size_t j = prev_off; j < off; ++j)
set.insert(packFixed<UInt128>(j, count, columns, key_sizes));
res_values[i] = set.size();
prev_off = off;
}
return true;
}
void executeHashed(
const ColumnArray::Offsets_t & offsets,
const ConstColumnPlainPtrs & columns,
ColumnUInt32::Container_t & res_values)
{
size_t count = columns.size();
typedef ClearableHashSet<UInt128, UInt128TrivialHash, HashTableGrower<INITIAL_SIZE_DEGREE>,
HashTableAllocatorWithStackMemory<(1 << INITIAL_SIZE_DEGREE) * sizeof(UInt128)> > Set;
Set set;
size_t prev_off = 0;
for (size_t i = 0; i < offsets.size(); ++i)
{
set.clear();
size_t off = offsets[i];
for (size_t j = prev_off; j < off; ++j)
set.insert(hash128(j, count, columns));
res_values[i] = set.size();
prev_off = off;
}
}
};
class FunctionArrayEnumerateUniq : public IFunction
{
public:
@ -1100,11 +1340,11 @@ public:
}
const ColumnArray * first_array = typeid_cast<const ColumnArray *>(&*array_columns[0]);
ColumnVector<UInt32> * res_nested = new ColumnVector<UInt32>;
ColumnUInt32 * res_nested = new ColumnUInt32;
ColumnArray * res_array = new ColumnArray(res_nested, first_array->getOffsetsColumn());
block.getByPosition(result).column = res_array;
ColumnVector<UInt32>::Container_t & res_values = res_nested->getData();
ColumnUInt32::Container_t & res_values = res_nested->getData();
if (!offsets->empty())
res_values.resize(offsets->back());
@ -1137,7 +1377,7 @@ private:
static constexpr size_t INITIAL_SIZE_DEGREE = 9;
template <typename T>
bool executeNumber(const ColumnArray * array, ColumnVector<UInt32>::Container_t & res_values)
bool executeNumber(const ColumnArray * array, ColumnUInt32::Container_t & res_values)
{
const ColumnVector<T> * nested = typeid_cast<const ColumnVector<T> *>(&array->getData());
if (!nested)
@ -1163,7 +1403,7 @@ private:
return true;
}
bool executeString(const ColumnArray * array, ColumnVector<UInt32>::Container_t & res_values)
bool executeString(const ColumnArray * array, ColumnUInt32::Container_t & res_values)
{
const ColumnString * nested = typeid_cast<const ColumnString *>(&array->getData());
if (!nested)
@ -1211,7 +1451,7 @@ private:
bool execute128bit(
const ColumnArray::Offsets_t & offsets,
const ConstColumnPlainPtrs & columns,
ColumnVector<UInt32>::Container_t & res_values)
ColumnUInt32::Container_t & res_values)
{
size_t count = columns.size();
size_t keys_bytes = 0;
@ -1248,7 +1488,7 @@ private:
void executeHashed(
const ColumnArray::Offsets_t & offsets,
const ConstColumnPlainPtrs & columns,
ColumnVector<UInt32>::Container_t & res_values)
ColumnUInt32::Container_t & res_values)
{
size_t count = columns.size();
@ -1457,13 +1697,263 @@ private:
};
class FunctionEmptyArrayToSingle : public IFunction
{
public:
static constexpr auto name = "emptyArrayToSingle";
static IFunction * create(const Context & context) { return new FunctionEmptyArrayToSingle; }
/// Получить имя функции.
String getName() const
{
return name;
}
/// Получить типы результата по типам аргументов. Если функция неприменима для данных аргументов - кинуть исключение.
DataTypePtr getReturnType(const DataTypes & arguments) const
{
if (arguments.size() != 1)
throw Exception("Number of arguments for function " + getName() + " doesn't match: passed "
+ toString(arguments.size()) + ", should be 1.",
ErrorCodes::NUMBER_OF_ARGUMENTS_DOESNT_MATCH);
const DataTypeArray * array_type = typeid_cast<const DataTypeArray *>(arguments[0].get());
if (!array_type)
throw Exception("Argument for function " + getName() + " must be array.",
ErrorCodes::ILLEGAL_TYPE_OF_ARGUMENT);
return arguments[0]->clone();
}
/// Выполнить функцию над блоком.
void execute(Block & block, const ColumnNumbers & arguments, size_t result)
{
if (executeConst(block, arguments, result))
return;
const ColumnArray * array = typeid_cast<const ColumnArray *>(block.getByPosition(arguments[0]).column.get());
if (!array)
throw Exception("Illegal column " + block.getByPosition(arguments[0]).column->getName() + " of first argument of function " + getName(),
ErrorCodes::ILLEGAL_COLUMN);
ColumnPtr res_ptr = array->cloneEmpty();
block.getByPosition(result).column = res_ptr;
ColumnArray & res = static_cast<ColumnArray &>(*res_ptr);
const IColumn & src_data = array->getData();
const ColumnArray::Offsets_t & src_offsets = array->getOffsets();
IColumn & res_data = res.getData();
ColumnArray::Offsets_t & res_offsets = res.getOffsets();
if (!( executeNumber<UInt8> (src_data, src_offsets, res_data, res_offsets)
|| executeNumber<UInt16> (src_data, src_offsets, res_data, res_offsets)
|| executeNumber<UInt32> (src_data, src_offsets, res_data, res_offsets)
|| executeNumber<UInt64> (src_data, src_offsets, res_data, res_offsets)
|| executeNumber<Int8> (src_data, src_offsets, res_data, res_offsets)
|| executeNumber<Int16> (src_data, src_offsets, res_data, res_offsets)
|| executeNumber<Int32> (src_data, src_offsets, res_data, res_offsets)
|| executeNumber<Int64> (src_data, src_offsets, res_data, res_offsets)
|| executeNumber<Float32> (src_data, src_offsets, res_data, res_offsets)
|| executeNumber<Float64> (src_data, src_offsets, res_data, res_offsets)
|| executeString (src_data, src_offsets, res_data, res_offsets)
|| executeFixedString (src_data, src_offsets, res_data, res_offsets)))
throw Exception("Illegal column " + block.getByPosition(arguments[0]).column->getName()
+ " of first argument of function " + getName(),
ErrorCodes::ILLEGAL_COLUMN);
}
private:
bool executeConst(Block & block, const ColumnNumbers & arguments, size_t result)
{
if (const ColumnConstArray * const_array = typeid_cast<const ColumnConstArray *>(block.getByPosition(arguments[0]).column.get()))
{
if (const_array->getData().empty())
{
auto nested_type = typeid_cast<const DataTypeArray &>(*block.getByPosition(arguments[0]).type).getNestedType();
block.getByPosition(result).column = new ColumnConstArray(
block.rowsInFirstColumn(),
{nested_type->getDefault()},
nested_type->clone());
}
else
block.getByPosition(result).column = block.getByPosition(arguments[0]).column;
return true;
}
else
return false;
}
template <typename T>
bool executeNumber(
const IColumn & src_data, const ColumnArray::Offsets_t & src_offsets,
IColumn & res_data_col, ColumnArray::Offsets_t & res_offsets)
{
if (const ColumnVector<T> * src_data_concrete = typeid_cast<const ColumnVector<T> *>(&src_data))
{
const PODArray<T> & src_data = src_data_concrete->getData();
PODArray<T> & res_data = typeid_cast<ColumnVector<T> &>(res_data_col).getData();
size_t size = src_offsets.size();
res_offsets.resize(size);
res_data.reserve(src_data.size());
ColumnArray::Offset_t src_prev_offset = 0;
ColumnArray::Offset_t res_prev_offset = 0;
for (size_t i = 0; i < size; ++i)
{
if (src_offsets[i] != src_prev_offset)
{
size_t size_to_write = src_offsets[i] - src_prev_offset;
size_t prev_res_data_size = res_data.size();
res_data.resize(prev_res_data_size + size_to_write);
memcpy(&res_data[prev_res_data_size], &src_data[src_prev_offset], size_to_write * sizeof(T));
res_prev_offset += size_to_write;
res_offsets[i] = res_prev_offset;
}
else
{
res_data.push_back(T());
++res_prev_offset;
res_offsets[i] = res_prev_offset;
}
src_prev_offset = src_offsets[i];
}
return true;
}
else
return false;
}
bool executeFixedString(
const IColumn & src_data, const ColumnArray::Offsets_t & src_offsets,
IColumn & res_data_col, ColumnArray::Offsets_t & res_offsets)
{
if (const ColumnFixedString * src_data_concrete = typeid_cast<const ColumnFixedString *>(&src_data))
{
const size_t n = src_data_concrete->getN();
const ColumnFixedString::Chars_t & src_data = src_data_concrete->getChars();
ColumnFixedString::Chars_t & res_data = typeid_cast<ColumnFixedString &>(res_data_col).getChars();
size_t size = src_offsets.size();
res_offsets.resize(size);
res_data.reserve(src_data.size());
ColumnArray::Offset_t src_prev_offset = 0;
ColumnArray::Offset_t res_prev_offset = 0;
for (size_t i = 0; i < size; ++i)
{
if (src_offsets[i] != src_prev_offset)
{
size_t size_to_write = src_offsets[i] - src_prev_offset;
size_t prev_res_data_size = res_data.size();
res_data.resize(prev_res_data_size + size_to_write * n);
memcpy(&res_data[prev_res_data_size], &src_data[src_prev_offset], size_to_write * n);
res_prev_offset += size_to_write;
res_offsets[i] = res_prev_offset;
}
else
{
size_t prev_res_data_size = res_data.size();
res_data.resize(prev_res_data_size + n);
memset(&res_data[prev_res_data_size], 0, n);
++res_prev_offset;
res_offsets[i] = res_prev_offset;
}
src_prev_offset = src_offsets[i];
}
return true;
}
else
return false;
}
bool executeString(
const IColumn & src_data, const ColumnArray::Offsets_t & src_array_offsets,
IColumn & res_data_col, ColumnArray::Offsets_t & res_array_offsets)
{
if (const ColumnString * src_data_concrete = typeid_cast<const ColumnString *>(&src_data))
{
const ColumnString::Offsets_t & src_string_offsets = src_data_concrete->getOffsets();
ColumnString::Offsets_t & res_string_offsets = typeid_cast<ColumnString &>(res_data_col).getOffsets();
const ColumnString::Chars_t & src_data = src_data_concrete->getChars();
ColumnString::Chars_t & res_data = typeid_cast<ColumnString &>(res_data_col).getChars();
size_t size = src_array_offsets.size();
res_array_offsets.resize(size);
res_string_offsets.reserve(src_string_offsets.size());
res_data.reserve(src_data.size());
ColumnArray::Offset_t src_array_prev_offset = 0;
ColumnArray::Offset_t res_array_prev_offset = 0;
ColumnString::Offset_t src_string_prev_offset = 0;
ColumnString::Offset_t res_string_prev_offset = 0;
for (size_t i = 0; i < size; ++i)
{
if (src_array_offsets[i] != src_array_prev_offset)
{
size_t array_size = src_array_offsets[i] - src_array_prev_offset;
size_t bytes_to_copy = 0;
size_t from_string_prev_offset_local = src_string_prev_offset;
for (size_t j = 0; j < array_size; ++j)
{
size_t string_size = src_string_offsets[src_array_prev_offset + j] - from_string_prev_offset_local;
res_string_prev_offset += string_size;
res_string_offsets.push_back(res_string_prev_offset);
from_string_prev_offset_local += string_size;
bytes_to_copy += string_size;
}
size_t res_data_old_size = res_data.size();
res_data.resize(res_data_old_size + bytes_to_copy);
memcpy(&res_data[res_data_old_size], &src_data[src_string_prev_offset], bytes_to_copy);
res_array_prev_offset += array_size;
res_array_offsets[i] = res_array_prev_offset;
}
else
{
res_data.push_back(0); /// Пустая строка, включая ноль на конце.
++res_string_prev_offset;
res_string_offsets.push_back(res_string_prev_offset);
++res_array_prev_offset;
res_array_offsets[i] = res_array_prev_offset;
}
src_array_prev_offset = src_array_offsets[i];
if (src_array_prev_offset)
src_string_prev_offset = src_string_offsets[src_array_prev_offset - 1];
}
return true;
}
else
return false;
}
};
struct NameHas { static constexpr auto name = "has"; };
struct NameIndexOf { static constexpr auto name = "indexOf"; };
struct NameCountEqual { static constexpr auto name = "countEqual"; };
typedef FunctionArrayIndex<IndexToOne, NameHas> FunctionHas;
typedef FunctionArrayIndex<IndexToOne, NameHas> FunctionHas;
typedef FunctionArrayIndex<IndexIdentity, NameIndexOf> FunctionIndexOf;
typedef FunctionArrayIndex<IndexCount, NameCountEqual> FunctionCountEqual;
typedef FunctionArrayIndex<IndexCount, NameCountEqual> FunctionCountEqual;
using FunctionEmptyArrayUInt8 = FunctionEmptyArray<DataTypeUInt8>;
using FunctionEmptyArrayUInt16 = FunctionEmptyArray<DataTypeUInt16>;

View File

@ -404,31 +404,36 @@ struct ExtractURLParameterImpl
{
size_t cur_offset = offsets[i];
const char * str = reinterpret_cast<const char *>(&data[prev_offset]);
const char * pos = nullptr;
do
const char * begin = strchr(str, '?');
if (begin != nullptr)
{
const char * str = reinterpret_cast<const char *>(&data[prev_offset]);
const char * begin = strchr(str, '?');
if (begin == nullptr)
break;
pos = strstr(begin + 1, param_str);
if (pos == nullptr)
break;
if (pos != begin + 1 && *(pos - 1) != ';' && *(pos - 1) != '&')
pos = begin + 1;
while (true)
{
pos = nullptr;
break;
}
pos = strstr(pos, param_str);
pos += param_len;
} while (false);
if (pos == nullptr)
break;
if (pos[-1] != '?' && pos[-1] != '&')
{
pos += param_len;
continue;
}
else
{
pos += param_len;
break;
}
}
}
if (pos != nullptr)
{
const char * end = strpbrk(pos, "&;#");
const char * end = strpbrk(pos, "&#");
if (end == nullptr)
end = pos + strlen(pos);

View File

@ -66,6 +66,7 @@ public:
/// Для ARRAY_JOIN
NameSet array_joined_columns;
bool array_join_is_left;
/// Для JOIN
const Join * join = nullptr;
@ -122,13 +123,14 @@ public:
return a;
}
static ExpressionAction arrayJoin(const NameSet & array_joined_columns)
static ExpressionAction arrayJoin(const NameSet & array_joined_columns, bool array_join_is_left)
{
if (array_joined_columns.empty())
throw Exception("No arrays to join", ErrorCodes::LOGICAL_ERROR);
ExpressionAction a;
a.type = ARRAY_JOIN;
a.array_joined_columns = array_joined_columns;
a.array_join_is_left = array_join_is_left;
return a;
}

View File

@ -226,6 +226,10 @@ private:
/// Превратить перечисление значений или подзапрос в ASTSet. node - функция in или notIn.
void makeSet(ASTFunction * node, const Block & sample_block);
/// Замена скалярных подзапросов на значения-константы.
void executeScalarSubqueries();
void executeScalarSubqueriesImpl(ASTPtr & ast);
/// Находит глобальные подзапросы в секциях GLOBAL IN/JOIN. Заполняет external_tables.
void initGlobalSubqueriesAndExternalTables();
void initGlobalSubqueries(ASTPtr & ast);

View File

@ -24,8 +24,7 @@ public:
*/
BlockIO execute() override
{
executeImpl(false);
return {};
return executeImpl(false);
}
/** assume_metadata_exists - не проверять наличие файла с метаданными и не создавать его
@ -45,7 +44,7 @@ public:
const ColumnDefaults & column_defaults);
private:
void executeImpl(bool assume_metadata_exists);
BlockIO executeImpl(bool assume_metadata_exists);
/// AST в список столбцов с типами. Столбцы типа Nested развернуты в список настоящих столбцов.
using ColumnsAndDefaults = std::pair<NamesAndTypesList, ColumnDefaults>;

View File

@ -109,7 +109,7 @@ private:
// Переименовать столбцы каждого запроса цепочки UNION ALL в такие же имена, как в первом запросе.
void renameColumns();
/** Из какой таблицы читать. При JOIN, возвращается "левая" таблицы.
/** Из какой таблицы читать. При JOIN, возвращается "левая" таблица.
*/
void getDatabaseAndTableNames(String & database_name, String & table_name);
@ -120,22 +120,43 @@ private:
/// Разные стадии выполнения запроса.
/// Вынимает данные из таблицы. Возвращает стадию, до которой запрос был обработан в Storage.
QueryProcessingStage::Enum executeFetchColumns(BlockInputStreams & streams);
QueryProcessingStage::Enum executeFetchColumns();
void executeWhere(ExpressionActionsPtr expression);
void executeAggregation(ExpressionActionsPtr expression, bool overflow_row, bool final);
void executeMergeAggregated(bool overflow_row, bool final);
void executeTotalsAndHaving(bool has_having, ExpressionActionsPtr expression, bool overflow_row);
void executeHaving(ExpressionActionsPtr expression);
void executeExpression(ExpressionActionsPtr expression);
void executeOrder();
void executeMergeSorted();
void executePreLimit();
void executeUnion();
void executeLimit();
void executeProjection(ExpressionActionsPtr expression);
void executeDistinct(bool before_order, Names columns);
void executeSubqueriesInSetsAndJoins(std::unordered_map<String, SubqueryForSet> & subqueries_for_sets);
template <typename Transform>
void transformStreams(Transform && transform)
{
for (auto & stream : streams)
transform(stream);
if (stream_with_non_joined_data)
transform(stream_with_non_joined_data);
}
bool hasNoData() const
{
return streams.empty() && !stream_with_non_joined_data;
}
bool hasMoreThanOneStream() const
{
return streams.size() + (stream_with_non_joined_data ? 1 : 0) > 1;
}
void executeWhere( BlockInputStreams & streams, ExpressionActionsPtr expression);
void executeAggregation( BlockInputStreams & streams, ExpressionActionsPtr expression, bool overflow_row, bool final);
void executeMergeAggregated( BlockInputStreams & streams, bool overflow_row, bool final);
void executeTotalsAndHaving( BlockInputStreams & streams, bool has_having, ExpressionActionsPtr expression, bool overflow_row);
void executeHaving( BlockInputStreams & streams, ExpressionActionsPtr expression);
void executeExpression( BlockInputStreams & streams, ExpressionActionsPtr expression);
void executeOrder( BlockInputStreams & streams);
void executeMergeSorted( BlockInputStreams & streams);
void executePreLimit( BlockInputStreams & streams);
void executeUnion( BlockInputStreams & streams);
void executeLimit( BlockInputStreams & streams);
void executeProjection( BlockInputStreams & streams, ExpressionActionsPtr expression);
void executeDistinct( BlockInputStreams & streams, bool before_order, Names columns);
void executeSubqueriesInSetsAndJoins(BlockInputStreams & streams, std::unordered_map<String, SubqueryForSet> & subqueries_for_sets);
void ignoreWithTotals();
@ -156,9 +177,21 @@ private:
QueryProcessingStage::Enum to_stage;
size_t subquery_depth;
std::unique_ptr<ExpressionAnalyzer> query_analyzer;
BlockInputStreams streams;
NamesAndTypesList table_column_names;
/** Потоки данных.
* Исходные потоки данных получаются в функции executeFetchColumns.
* Затем они преобразуются (оборачиваются в другие потоки) с помощью функций execute*,
* чтобы получить целый конвейер выполнения запроса.
*/
BlockInputStreams streams;
/** При выполнении FULL или RIGHT JOIN, здесь будет поток данных, из которого можно прочитать "неприсоединённые" строки.
* Он имеет особое значение, так как чтение из него должно осуществляться после чтения из основных потоков.
* Он подклеивается к основным потокам в UnionBlockInputStream или ParallelAggregatingBlockInputStream.
*/
BlockInputStreamPtr stream_with_non_joined_data;
/// Являемся ли мы первым запросом SELECT цепочки UNION ALL?
bool is_first_select_inside_union_all;

View File

@ -18,8 +18,9 @@ namespace DB
/** Структура данных для реализации JOIN-а.
* По сути, хэш-таблица: ключи -> строки присоединяемой таблицы.
* Исключение - CROSS JOIN, где вместо хэш-таблицы просто набор блоков без ключей.
*
* JOIN-ы бывают восьми типов: ANY/ALL x LEFT/INNER/RIGHT/FULL.
* JOIN-ы бывают девяти типов: ANY/ALL × LEFT/INNER/RIGHT/FULL, а также CROSS.
*
* Если указано ANY - выбрать из "правой" таблицы только одну, первую попавшуюся строку, даже если там более одной соответствующей строки.
* Если указано ALL - обычный вариант JOIN-а, при котором строки могут размножаться по числу соответствующих строк "правой" таблицы.
@ -213,6 +214,7 @@ private:
KEY_64,
KEY_STRING,
HASHED,
CROSS,
};
Type type = Type::EMPTY;
@ -249,6 +251,8 @@ private:
template <ASTJoin::Kind KIND, ASTJoin::Strictness STRICTNESS, typename Maps>
void joinBlockImpl(Block & block, const Maps & maps) const;
void joinBlockImplCross(Block & block) const;
/// Проверить не превышены ли допустимые размеры множества
bool checkSizeLimits() const;

View File

@ -41,9 +41,14 @@ inline void evaluateMissingDefaults(Block & block,
/// evaluate default values for defaulted columns
ExpressionAnalyzer{default_expr_list, context, {}, required_columns}.getActions(true)->execute(copy_block);
/// move evaluated columns to the original block
/// move evaluated columns to the original block, materializing them at the same time
for (auto & column_name_type : copy_block.getColumns())
{
if (column_name_type.column->isConst())
column_name_type.column = static_cast<const IColumnConst &>(*column_name_type.column).convertToFullColumn();
block.insert(std::move(column_name_type));
}
}
}

View File

@ -32,7 +32,8 @@ public:
Inner, /// Оставить только записи, для которых в "правой" таблице есть соответствующая.
Left, /// Если в "правой" таблице нет соответствующих записей, заполнить столбцы значениями "по-умолчанию".
Right,
Full
Full,
Cross /// Прямое произведение. strictness и using_expr_list не используются.
};
Locality locality = Local;
@ -61,7 +62,8 @@ public:
kind == Inner ? "Inner"
: (kind == Left ? "Left"
: (kind == Right ? "Right"
: "Full")), wb);
: (kind == Full ? "Full"
: "Cross"))), wb);
writeString("Join", wb);
}

View File

@ -50,6 +50,7 @@ public:
ASTPtr select_expression_list;
ASTPtr database;
ASTPtr table; /// Идентификатор, табличная функция или подзапрос (рекурсивно ASTSelectQuery)
bool array_join_is_left = false; /// LEFT ARRAY JOIN
ASTPtr array_join_expression_list; /// ARRAY JOIN
ASTPtr join; /// Обычный (не ARRAY) JOIN.
bool final = false;

View File

@ -2,7 +2,7 @@
#include <DB/DataTypes/IDataType.h>
#include <DB/Parsers/IAST.h>
#include <DB/Parsers/ASTWithAlias.h>
namespace DB
@ -11,12 +11,12 @@ namespace DB
/** Подзарос SELECT
*/
class ASTSubquery : public IAST
class ASTSubquery : public ASTWithAlias
{
public:
ASTSubquery() = default;
ASTSubquery(const StringRange range_) : IAST(range_) {}
ASTSubquery(const StringRange range_) : ASTWithAlias(range_) {}
/** Получить текст, который идентифицирует этот элемент. */
String getID() const override { return "Subquery"; }

View File

@ -779,6 +779,13 @@ public:
return it == std::end(column_sizes) ? 0 : it->second;
}
using ColumnSizes = std::unordered_map<std::string, size_t>;
ColumnSizes getColumnSizes() const
{
Poco::ScopedLock<Poco::FastMutex> lock{data_parts_mutex};
return column_sizes;
}
/// Для ATTACH/DETACH/DROP PARTITION.
static String getMonthName(const Field & partition);
static DayNum_t getMonthDayNum(const Field & partition);
@ -810,7 +817,7 @@ private:
NamesAndTypesListPtr columns;
/// Актуальные размеры столбцов в сжатом виде
std::unordered_map<std::string, size_t> column_sizes;
ColumnSizes column_sizes;
BrokenPartCallback broken_part_callback;

View File

@ -247,7 +247,7 @@ private:
++right;
}
/// Если правее засечек нет, просто используем DEFAULT_BUFFER_SIZE
/// Если правее засечек нет, просто используем max_read_buffer_size
if (right >= (*marks).size() || (right + 1 == (*marks).size() &&
(*marks)[right].offset_in_compressed_file == (*marks)[all_mark_ranges[i].end].offset_in_compressed_file))
{

View File

@ -652,7 +652,7 @@ AggregateFunctionPtr AggregateFunctionFactory::get(const String & name, const Da
AggregateFunctionPtr nested = get(String(name.data(), name.size() - strlen("State")), argument_types, recursion_level + 1);
return new AggregateFunctionState(nested);
}
else if (recursion_level == 0 && name.size() > strlen("Merge") && !(strcmp(name.data() + name.size() - strlen("Merge"), "Merge")))
else if (recursion_level <= 1 && name.size() > strlen("Merge") && !(strcmp(name.data() + name.size() - strlen("Merge"), "Merge")))
{
/// Для агрегатных функций вида aggMerge, где agg - имя другой агрегатной функции.
if (argument_types.size() != 1)
@ -668,7 +668,7 @@ AggregateFunctionPtr AggregateFunctionFactory::get(const String & name, const Da
return new AggregateFunctionMerge(nested);
}
else if (recursion_level <= 1 && name.size() >= 3 && name[name.size() - 2] == 'I' && name[name.size() - 1] == 'f')
else if (recursion_level <= 2 && name.size() >= 3 && name[name.size() - 2] == 'I' && name[name.size() - 1] == 'f')
{
if (argument_types.empty())
throw Exception{
@ -682,7 +682,7 @@ AggregateFunctionPtr AggregateFunctionFactory::get(const String & name, const Da
AggregateFunctionPtr nested = get(String(name.data(), name.size() - 2), nested_dt, recursion_level + 1);
return new AggregateFunctionIf(nested);
}
else if (recursion_level <= 2 && name.size() > strlen("Array") && !(strcmp(name.data() + name.size() - strlen("Array"), "Array")))
else if (recursion_level <= 3 && name.size() > strlen("Array") && !(strcmp(name.data() + name.size() - strlen("Array"), "Array")))
{
/// Для агрегатных функций вида aggArray, где agg - имя другой агрегатной функции.
size_t num_agruments = argument_types.size();
@ -695,7 +695,7 @@ AggregateFunctionPtr AggregateFunctionFactory::get(const String & name, const Da
else
throw Exception("Illegal type " + argument_types[i]->getName() + " of argument #" + toString(i + 1) + " for aggregate function " + name + ". Must be array.", ErrorCodes::ILLEGAL_TYPE_OF_ARGUMENT);
}
AggregateFunctionPtr nested = get(String(name.data(), name.size() - strlen("Array")), nested_arguments, recursion_level + 2); /// + 2, чтобы ни один другой модификатор не мог идти перед Array
AggregateFunctionPtr nested = get(String(name.data(), name.size() - strlen("Array")), nested_arguments, recursion_level + 3); /// + 3, чтобы ни один другой модификатор не мог идти перед Array
return new AggregateFunctionArray(nested);
}
else
@ -765,14 +765,14 @@ bool AggregateFunctionFactory::isAggregateFunctionName(const String & name, int
if (recursion_level <= 0 && name.size() > strlen("State") && !(strcmp(name.data() + name.size() - strlen("State"), "State")))
return isAggregateFunctionName(String(name.data(), name.size() - strlen("State")), recursion_level + 1);
/// Для агрегатных функций вида aggMerge, где agg - имя другой агрегатной функции.
if (recursion_level <= 0 && name.size() > strlen("Merge") && !(strcmp(name.data() + name.size() - strlen("Merge"), "Merge")))
if (recursion_level <= 1 && name.size() > strlen("Merge") && !(strcmp(name.data() + name.size() - strlen("Merge"), "Merge")))
return isAggregateFunctionName(String(name.data(), name.size() - strlen("Merge")), recursion_level + 1);
/// Для агрегатных функций вида aggIf, где agg - имя другой агрегатной функции.
if (recursion_level <= 1 && name.size() >= 3 && name[name.size() - 2] == 'I' && name[name.size() - 1] == 'f')
if (recursion_level <= 2 && name.size() >= 3 && name[name.size() - 2] == 'I' && name[name.size() - 1] == 'f')
return isAggregateFunctionName(String(name.data(), name.size() - 2), recursion_level + 1);
/// Для агрегатных функций вида aggArray, где agg - имя другой агрегатной функции.
if (recursion_level <= 2 && name.size() > strlen("Array") && !(strcmp(name.data() + name.size() - strlen("Array"), "Array")))
return isAggregateFunctionName(String(name.data(), name.size() - strlen("Array")), recursion_level + 2); /// + 2, чтобы ни один другой модификатор не мог идти перед Array
if (recursion_level <= 3 && name.size() > strlen("Array") && !(strcmp(name.data() + name.size() - strlen("Array"), "Array")))
return isAggregateFunctionName(String(name.data(), name.size() - strlen("Array")), recursion_level + 3); /// + 3, чтобы ни один другой модификатор не мог идти перед Array
return false;
}

View File

@ -369,6 +369,19 @@ private:
}
/** Проверка для случая, когда в терминал вставляется многострочный запрос из буфера обмена.
* Позволяет не начинать выполнение одной строчки запроса, пока весь запрос не будет вставлен.
*/
static bool hasDataInSTDIN()
{
timeval timeout = { 0, 0 };
fd_set fds;
FD_ZERO(&fds);
FD_SET(STDIN_FILENO, &fds);
return select(1, &fds, 0, 0, &timeout) == 1;
}
void loop()
{
String query;
@ -395,7 +408,7 @@ private:
query += line;
if (!ends_with_backslash && (ends_with_semicolon || has_vertical_output_suffix || !config().has("multiline")))
if (!ends_with_backslash && (ends_with_semicolon || has_vertical_output_suffix || (!config().has("multiline") && !hasDataInSTDIN())))
{
if (query != prev_query)
{
@ -464,6 +477,12 @@ private:
copyData(in, out);
}
process(line);
}
bool process(const String & line)
{
if (config().has("multiquery"))
{
/// Несколько запросов, разделенных ';'.
@ -494,17 +513,20 @@ private:
while (isWhitespace(*begin) || *begin == ';')
++begin;
process(query, ast);
if (!processSingleQuery(query, ast))
return false;
}
return true;
}
else
{
process(line);
return processSingleQuery(line);
}
}
bool process(const String & line, ASTPtr parsed_query_ = nullptr)
bool processSingleQuery(const String & line, ASTPtr parsed_query_ = nullptr)
{
if (exit_strings.end() != exit_strings.find(line))
return false;
@ -838,15 +860,8 @@ private:
}
void onData(Block & block)
void initBlockOutputStream(const Block & block)
{
if (written_progress_chars)
clearProgress();
if (!block)
return;
processed_rows += block.rows();
if (!block_std_out)
{
String current_format = format;
@ -869,8 +884,21 @@ private:
block_std_out = context.getFormatFactory().getOutput(current_format, std_out, block);
block_std_out->writePrefix();
}
}
/// Загаловочный блок с нулем строк использовался для инициализации block_std_out,
void onData(Block & block)
{
if (written_progress_chars)
clearProgress();
if (!block)
return;
processed_rows += block.rows();
initBlockOutputStream(block);
/// Заголовочный блок с нулем строк использовался для инициализации block_std_out,
/// выводить его не нужно
if (block.rows() != 0)
{
@ -885,11 +913,13 @@ private:
void onTotals(Block & block)
{
initBlockOutputStream(block);
block_std_out->setTotals(block);
}
void onExtremes(Block & block)
{
initBlockOutputStream(block);
block_std_out->setExtremes(block);
}

View File

@ -302,7 +302,13 @@ std::string Block::dumpStructure() const
{
if (it != data.begin())
res << ", ";
res << it->name << ' ' << it->type->getName() << ' ' << it->column->getName() << ' ' << it->column->size();
res << it->name << ' ' << it->type->getName();
if (it->column)
res << ' ' << it->column->getName() << ' ' << it->column->size();
else
res << "nullptr";
}
return res.str();
}

View File

@ -26,7 +26,7 @@ namespace DB
{
BlockInputStreamPtr FormatFactory::getInput(const String & name, ReadBuffer & buf,
Block & sample, size_t max_block_size) const
const Block & sample, size_t max_block_size) const
{
if (name == "Native")
return new NativeBlockInputStream(buf);
@ -48,7 +48,7 @@ BlockInputStreamPtr FormatFactory::getInput(const String & name, ReadBuffer & bu
BlockOutputStreamPtr FormatFactory::getOutput(const String & name, WriteBuffer & buf,
Block & sample) const
const Block & sample) const
{
if (name == "Native")
return new NativeBlockOutputStream(buf);

View File

@ -38,7 +38,7 @@ int main(int argc, char ** argv)
streams.emplace_back(new DB::LimitBlockInputStream(table->read(column_names, 0, DB::Context{}, DB::Settings(), stage2, 1)[0], 30, 2000));
streams.emplace_back(new DB::LimitBlockInputStream(table->read(column_names, 0, DB::Context{}, DB::Settings(), stage3, 1)[0], 30, 100));
DB::UnionBlockInputStream union_stream(streams, 2);
DB::UnionBlockInputStream union_stream(streams, nullptr, 2);
DB::FormatFactory format_factory;
DB::WriteBufferFromFileDescriptor wb(STDERR_FILENO);

View File

@ -44,7 +44,7 @@ int main(int argc, char ** argv)
for (size_t i = 0, size = streams.size(); i < size; ++i)
streams[i] = new DB::AsynchronousBlockInputStream(streams[i]);
DB::BlockInputStreamPtr stream = new DB::UnionBlockInputStream(streams, settings.max_threads);
DB::BlockInputStreamPtr stream = new DB::UnionBlockInputStream(streams, nullptr, settings.max_threads);
stream = new DB::LimitBlockInputStream(stream, 10);
DB::FormatFactory format_factory;

View File

@ -13,6 +13,7 @@ void registerFunctionsArray(FunctionFactory & factory)
factory.registerFunction<FunctionCountEqual>();
factory.registerFunction<FunctionArrayEnumerate>();
factory.registerFunction<FunctionArrayEnumerateUniq>();
factory.registerFunction<FunctionArrayUniq>();
factory.registerFunction<FunctionEmptyArrayUInt8>();
factory.registerFunction<FunctionEmptyArrayUInt16>();
factory.registerFunction<FunctionEmptyArrayUInt32>();
@ -26,6 +27,7 @@ void registerFunctionsArray(FunctionFactory & factory)
factory.registerFunction<FunctionEmptyArrayDate>();
factory.registerFunction<FunctionEmptyArrayDateTime>();
factory.registerFunction<FunctionEmptyArrayString>();
factory.registerFunction<FunctionEmptyArrayToSingle>();
factory.registerFunction<FunctionRange>();
}

View File

@ -5,6 +5,7 @@
#include <Poco/SharedPtr.h>
#include <Poco/Mutex.h>
#include <Poco/File.h>
#include <Poco/UUIDGenerator.h>
#include <Yandex/logger_useful.h>
@ -96,6 +97,8 @@ struct ContextShared
/// Создаются при создании Distributed таблиц, так как нужно дождаться пока будут выставлены Settings
Poco::SharedPtr<Clusters> clusters;
Poco::UUIDGenerator uuid_generator;
bool shutdown_called = false;
@ -587,8 +590,12 @@ void Context::setCurrentDatabase(const String & name)
void Context::setCurrentQueryId(const String & query_id)
{
String query_id_to_set = query_id;
if (query_id_to_set.empty()) /// Если пользователь не передал свой query_id, то генерируем его самостоятельно.
query_id_to_set = shared->uuid_generator.createRandom().toString();
Poco::ScopedLock<Poco::Mutex> lock(shared->mutex);
current_query_id = query_id;
current_query_id = query_id_to_set;
}

View File

@ -6,6 +6,7 @@
#include <DB/DataTypes/DataTypeNested.h>
#include <DB/DataTypes/DataTypeArray.h>
#include <DB/Functions/IFunction.h>
#include <DB/Functions/FunctionsArray.h>
#include <set>
@ -268,6 +269,24 @@ void ExpressionAction::execute(Block & block) const
if (!any_array)
throw Exception("ARRAY JOIN of not array: " + *array_joined_columns.begin(), ErrorCodes::TYPE_MISMATCH);
/// Если LEFT ARRAY JOIN, то создаём столбцы, в которых пустые массивы заменены на массивы с одним элементом - значением по-умолчанию.
std::map<String, ColumnPtr> non_empty_array_columns;
if (array_join_is_left)
{
for (const auto & name : array_joined_columns)
{
auto src_col = block.getByName(name);
Block tmp_block{src_col, {{}, src_col.type, {}}};
FunctionEmptyArrayToSingle().execute(tmp_block, {0}, 1);
non_empty_array_columns[name] = tmp_block.getByPosition(1).column;
}
any_array_ptr = non_empty_array_columns.begin()->second;
any_array = typeid_cast<const ColumnArray *>(&*any_array_ptr);
}
size_t columns = block.columns();
for (size_t i = 0; i < columns; ++i)
{
@ -278,7 +297,8 @@ void ExpressionAction::execute(Block & block) const
if (!typeid_cast<const DataTypeArray *>(&*current.type))
throw Exception("ARRAY JOIN of not array: " + current.name, ErrorCodes::TYPE_MISMATCH);
ColumnPtr array_ptr = current.column;
ColumnPtr array_ptr = array_join_is_left ? non_empty_array_columns[current.name] : current.column;
if (array_ptr->isConst())
array_ptr = dynamic_cast<const IColumnConst &>(*array_ptr).convertToFullColumn();
@ -379,7 +399,7 @@ std::string ExpressionAction::toString() const
break;
case ARRAY_JOIN:
ss << "ARRAY JOIN ";
ss << (array_join_is_left ? "LEFT " : "") << "ARRAY JOIN ";
for (NameSet::const_iterator it = array_joined_columns.begin(); it != array_joined_columns.end(); ++it)
{
if (it != array_joined_columns.begin())
@ -761,7 +781,7 @@ std::string ExpressionActions::getID() const
ss << actions[i].result_name;
if (actions[i].type == ExpressionAction::ARRAY_JOIN)
{
ss << "{";
ss << (actions[i].array_join_is_left ? "LEFT ARRAY JOIN" : "ARRAY JOIN") << "{";
for (NameSet::const_iterator it = actions[i].array_joined_columns.begin();
it != actions[i].array_joined_columns.end(); ++it)
{

View File

@ -87,6 +87,18 @@ const std::unordered_set<String> possibly_injective_function_names
"dictGetDateTime"
};
static bool functionIsInOperator(const String & name)
{
return name == "in" || name == "notIn";
}
static bool functionIsInOrGlobalInOperator(const String & name)
{
return name == "in" || name == "notIn" || name == "globalIn" || name == "globalNotIn";
}
void ExpressionAnalyzer::init()
{
select_query = typeid_cast<ASTSelectQuery *>(&*ast);
@ -95,6 +107,7 @@ void ExpressionAnalyzer::init()
LogicalExpressionsOptimizer logical_expressions_optimizer(select_query, settings);
logical_expressions_optimizer.optimizeDisjunctiveEqualityChains();
/// Добавляет в множество известных алиасов те, которые объявлены в структуре таблицы (ALIAS-столбцы).
addStorageAliases();
/// Создаёт словарь aliases: alias -> ASTPtr
@ -103,6 +116,9 @@ void ExpressionAnalyzer::init()
/// Common subexpression elimination. Rewrite rules.
normalizeTree();
/// Выполнение скалярных подзапросов - замена их на значения-константы.
executeScalarSubqueries();
/// GROUP BY injective function elimination.
optimizeGroupBy();
@ -146,7 +162,10 @@ void ExpressionAnalyzer::analyzeAggregation()
if (select_query && select_query->join)
{
getRootActions(typeid_cast<ASTJoin &>(*select_query->join).using_expr_list, true, false, temp_actions);
auto join = typeid_cast<ASTJoin &>(*select_query->join);
if (join.using_expr_list)
getRootActions(join.using_expr_list, true, false, temp_actions);
addJoinAction(temp_actions, true);
}
@ -385,7 +404,7 @@ void ExpressionAnalyzer::normalizeTreeImpl(
}
/// Может быть указано IN t, где t - таблица, что равносильно IN (SELECT * FROM t).
if (func_node->name == "in" || func_node->name == "notIn" || func_node->name == "globalIn" || func_node->name == "globalNotIn")
if (functionIsInOrGlobalInOperator(func_node->name))
if (ASTIdentifier * right = typeid_cast<ASTIdentifier *>(&*func_node->arguments->children.at(1)))
right->kind = ASTIdentifier::Table;
@ -525,6 +544,145 @@ void ExpressionAnalyzer::normalizeTreeImpl(
finished_asts[initial_ast] = ast;
}
void ExpressionAnalyzer::executeScalarSubqueries()
{
if (!select_query)
executeScalarSubqueriesImpl(ast);
else
{
for (auto & child : ast->children)
{
/// Не опускаемся в FROM и JOIN.
if (child.get() != select_query->table.get() && child.get() != select_query->join.get())
executeScalarSubqueriesImpl(child);
}
}
}
static ASTPtr addTypeConversion(ASTLiteral * ast_, const String & type_name)
{
if (0 == type_name.compare(0, strlen("Array"), "Array"))
return ast_; /// Преобразование типов для массивов пока не поддерживаем.
auto ast = std::unique_ptr<ASTLiteral>(ast_);
ASTFunction * func = new ASTFunction(ast->range);
ASTPtr res = func;
func->alias = ast->alias;
ast->alias.clear();
func->kind = ASTFunction::FUNCTION;
func->name = "to" + type_name;
ASTExpressionList * exp_list = new ASTExpressionList(ast->range);
func->arguments = exp_list;
func->children.push_back(func->arguments);
exp_list->children.push_back(ast.release());
return res;
}
void ExpressionAnalyzer::executeScalarSubqueriesImpl(ASTPtr & ast)
{
/** Заменяем подзапросы, возвращающие ровно одну строку
* ("скалярные" подзапросы) на соответствующие константы.
*
* Если подзапрос возвращает более одного столбца, то он заменяется на кортеж констант.
*
* Особенности:
*
* Замена происходит во время анализа запроса, а не во время основной стадии выполнения.
* Это значит, что не будет работать индикатор прогресса во время выполнения этих запросов,
* а также такие запросы нельзя будет прервать.
*
* Зато результат запросов может быть использован для индекса в таблице.
*
* Скалярные подзапросы выполняются на сервере-инициаторе запроса.
* На удалённые серверы запрос отправляется с уже подставленными константами.
*/
if (ASTSubquery * subquery = typeid_cast<ASTSubquery *>(ast.get()))
{
Context subquery_context = context;
Settings subquery_settings = context.getSettings();
subquery_settings.limits.max_result_rows = 1;
subquery_settings.extremes = 0;
subquery_context.setSettings(subquery_settings);
ASTPtr query = subquery->children.at(0);
BlockIO res = InterpreterSelectQuery(query, subquery_context, QueryProcessingStage::Complete, subquery_depth + 1).execute();
Block block;
try
{
block = res.in->read();
if (!block)
throw Exception("Scalar subquery returned empty result", ErrorCodes::INCORRECT_RESULT_OF_SCALAR_SUBQUERY);
if (block.rows() != 1 || res.in->read())
throw Exception("Scalar subquery returned more than one row", ErrorCodes::INCORRECT_RESULT_OF_SCALAR_SUBQUERY);
}
catch (const Exception & e)
{
if (e.code() == ErrorCodes::TOO_MUCH_ROWS)
throw Exception("Scalar subquery returned more than one row", ErrorCodes::INCORRECT_RESULT_OF_SCALAR_SUBQUERY);
else
throw;
}
size_t columns = block.columns();
if (columns == 1)
{
ASTLiteral * lit = new ASTLiteral(ast->range, (*block.getByPosition(0).column)[0]);
lit->alias = subquery->alias;
ast = addTypeConversion(lit, block.getByPosition(0).type->getName());
}
else
{
ASTFunction * tuple = new ASTFunction(ast->range);
tuple->alias = subquery->alias;
ast = tuple;
tuple->kind = ASTFunction::FUNCTION;
tuple->name = "tuple";
ASTExpressionList * exp_list = new ASTExpressionList(ast->range);
tuple->arguments = exp_list;
tuple->children.push_back(tuple->arguments);
exp_list->children.resize(columns);
for (size_t i = 0; i < columns; ++i)
{
exp_list->children[i] = addTypeConversion(
new ASTLiteral(ast->range, (*block.getByPosition(i).column)[0]),
block.getByPosition(i).type->getName());
}
}
}
else
{
/** Не опускаемся в подзапросы в аргументах IN.
* Но если аргумент - не подзапрос, то глубже внутри него могут быть подзапросы, и в них надо опускаться.
*/
ASTFunction * func = typeid_cast<ASTFunction *>(ast.get());
if (func && func->kind == ASTFunction::FUNCTION
&& functionIsInOrGlobalInOperator(func->name))
{
for (auto & child : ast->children)
{
if (child.get() != func->arguments)
executeScalarSubqueriesImpl(child);
else
for (size_t i = 0, size = func->arguments->children.size(); i < size; ++i)
if (i != 1 || !typeid_cast<ASTSubquery *>(func->arguments->children[i].get()))
executeScalarSubqueriesImpl(func->arguments->children[i]);
}
}
else
for (auto & child : ast->children)
executeScalarSubqueriesImpl(child);
}
}
void ExpressionAnalyzer::optimizeGroupBy()
{
if (!(select_query && select_query->group_expression_list))
@ -654,7 +812,7 @@ void ExpressionAnalyzer::makeSetsForIndexImpl(ASTPtr & node, const Block & sampl
makeSetsForIndexImpl(child, sample_block);
ASTFunction * func = typeid_cast<ASTFunction *>(node.get());
if (func && func->kind == ASTFunction::FUNCTION && (func->name == "in" || func->name == "notIn"))
if (func && func->kind == ASTFunction::FUNCTION && functionIsInOperator(func->name))
{
IAST & args = *func->arguments;
ASTPtr & arg = args.children.at(1);
@ -690,7 +848,8 @@ static SharedPtr<InterpreterSelectQuery> interpretSubquery(
* Так как результат этого поздапроса - ещё не результат всего запроса.
* Вместо этого работают ограничения
* max_rows_in_set, max_bytes_in_set, set_overflow_mode,
* max_rows_in_join, max_bytes_in_join, join_overflow_mode.
* max_rows_in_join, max_bytes_in_join, join_overflow_mode,
* которые проверяются отдельно (в объектах Set, Join).
*/
Context subquery_context = context;
Settings subquery_settings = context.getSettings();
@ -1210,7 +1369,7 @@ void ExpressionAnalyzer::getActionsImpl(ASTPtr ast, bool no_subqueries, bool onl
actions_stack.addAction(ExpressionAction::copyColumn(arg->getColumnName(), result_name));
NameSet joined_columns;
joined_columns.insert(result_name);
actions_stack.addAction(ExpressionAction::arrayJoin(joined_columns));
actions_stack.addAction(ExpressionAction::arrayJoin(joined_columns, false));
}
return;
@ -1218,7 +1377,7 @@ void ExpressionAnalyzer::getActionsImpl(ASTPtr ast, bool no_subqueries, bool onl
if (node->kind == ASTFunction::FUNCTION)
{
if (node->name == "in" || node->name == "notIn" || node->name == "globalIn" || node->name == "globalNotIn")
if (functionIsInOrGlobalInOperator(node->name))
{
if (!no_subqueries)
{
@ -1507,7 +1666,7 @@ void ExpressionAnalyzer::addMultipleArrayJoinAction(ExpressionActionsPtr & actio
result_columns.insert(result_source.first);
}
actions->add(ExpressionAction::arrayJoin(result_columns));
actions->add(ExpressionAction::arrayJoin(result_columns, select_query->array_join_is_left));
}
bool ExpressionAnalyzer::appendArrayJoin(ExpressionActionsChain & chain, bool only_types)
@ -1548,7 +1707,8 @@ bool ExpressionAnalyzer::appendJoin(ExpressionActionsChain & chain, bool only_ty
ExpressionActionsChain::Step & step = chain.steps.back();
ASTJoin & ast_join = typeid_cast<ASTJoin &>(*select_query->join);
getRootActions(ast_join.using_expr_list, only_types, false, step.actions);
if (ast_join.using_expr_list)
getRootActions(ast_join.using_expr_list, only_types, false, step.actions);
/// Не поддерживается два JOIN-а с одинаковым подзапросом, но разными USING-ами.
String join_id = ast_join.table->getColumnName();
@ -1888,7 +2048,7 @@ void ExpressionAnalyzer::collectUsedColumns()
}
/* for (const auto & name_type : columns_added_by_join)
std::cerr << "JOINed column (required, not key): " << name_type.first << std::endl;
std::cerr << "JOINed column (required, not key): " << name_type.name << std::endl;
std::cerr << std::endl;*/
/// Вставляем в список требуемых столбцов столбцы, нужные для вычисления ARRAY JOIN.
@ -1968,14 +2128,17 @@ void ExpressionAnalyzer::collectJoinedColumns(NameSet & joined_columns, NamesAnd
nested_result_sample = InterpreterSelectQuery::getSampleBlock(subquery, context);
}
auto & keys = typeid_cast<ASTExpressionList &>(*node.using_expr_list);
for (const auto & key : keys.children)
if (node.using_expr_list)
{
if (!join_key_names_left_set.insert(key->getColumnName()).second)
throw Exception("Duplicate column in USING list", ErrorCodes::DUPLICATE_COLUMN);
auto & keys = typeid_cast<ASTExpressionList &>(*node.using_expr_list);
for (const auto & key : keys.children)
{
if (!join_key_names_left_set.insert(key->getColumnName()).second)
throw Exception("Duplicate column in USING list", ErrorCodes::DUPLICATE_COLUMN);
if (!join_key_names_right_set.insert(key->getAliasOrColumnName()).second)
throw Exception("Duplicate column in USING list", ErrorCodes::DUPLICATE_COLUMN);
if (!join_key_names_right_set.insert(key->getAliasOrColumnName()).second)
throw Exception("Duplicate column in USING list", ErrorCodes::DUPLICATE_COLUMN);
}
}
for (const auto i : ext::range(0, nested_result_sample.columns()))

View File

@ -7,7 +7,7 @@
#include <DB/IO/WriteHelpers.h>
#include <DB/DataStreams/MaterializingBlockInputStream.h>
#include <DB/DataStreams/copyData.h>
#include <DB/DataStreams/NullAndDoCopyBlockInputStream.h>
#include <DB/Parsers/ASTCreateQuery.h>
#include <DB/Parsers/ASTNameTypePair.h>
@ -42,7 +42,7 @@ InterpreterCreateQuery::InterpreterCreateQuery(ASTPtr query_ptr_, Context & cont
}
void InterpreterCreateQuery::executeImpl(bool assume_metadata_exists)
BlockIO InterpreterCreateQuery::executeImpl(bool assume_metadata_exists)
{
String path = context.getPath();
String current_database = context.getCurrentDatabase();
@ -81,7 +81,7 @@ void InterpreterCreateQuery::executeImpl(bool assume_metadata_exists)
if (!create.if_not_exists || !context.isDatabaseExist(database_name))
context.addDatabase(database_name);
return;
return {};
}
SharedPtr<InterpreterSelectQuery> interpreter_select;
@ -118,7 +118,7 @@ void InterpreterCreateQuery::executeImpl(bool assume_metadata_exists)
if (context.isTableExist(database_name, table_name))
{
if (create.if_not_exists)
return;
return {};
else
throw Exception("Table " + database_name + "." + table_name + " already exists.", ErrorCodes::TABLE_ALREADY_EXISTS);
}
@ -251,9 +251,16 @@ void InterpreterCreateQuery::executeImpl(bool assume_metadata_exists)
/// Если запрос CREATE SELECT, то вставим в таблицу данные
if (create.select && storage_name != "View" && (storage_name != "MaterializedView" || create.is_populate))
{
BlockInputStreamPtr from = new MaterializingBlockInputStream(interpreter_select->execute().in);
copyData(*from, *res->write(query_ptr));
BlockIO io;
io.in_sample = select_sample;
io.in = new NullAndDoCopyBlockInputStream(
new MaterializingBlockInputStream(interpreter_select->execute().in),
res->write(query_ptr));
return io;
}
return {};
}
InterpreterCreateQuery::ColumnsAndDefaults InterpreterCreateQuery::parseColumns(ASTPtr expression_list)

View File

@ -100,6 +100,7 @@ BlockIO InterpreterInsertQuery::execute()
InterpreterSelectQuery interpreter_select{query.select, context};
BlockInputStreamPtr in{interpreter_select.execute().in};
res.in = new NullAndDoCopyBlockInputStream{in, out};
res.in_sample = interpreter_select.getSampleBlock();
}
return res;

View File

@ -16,6 +16,7 @@
#include <DB/DataStreams/CreatingSetsBlockInputStream.h>
#include <DB/DataStreams/MaterializingBlockInputStream.h>
#include <DB/DataStreams/FormatFactory.h>
#include <DB/DataStreams/ConcatBlockInputStream.h>
#include <DB/Parsers/ASTSelectQuery.h>
#include <DB/Parsers/ASTIdentifier.h>
@ -316,7 +317,7 @@ BlockIO InterpreterSelectQuery::execute()
{
(void) executeWithoutUnion();
if (streams.empty())
if (hasNoData())
{
BlockIO res;
res.in = new NullBlockInputStream;
@ -324,14 +325,11 @@ BlockIO InterpreterSelectQuery::execute()
return res;
}
executeUnion(streams);
executeUnion();
/// Ограничения на результат, квота на результат, а также колбек для прогресса.
if (IProfilingBlockInputStream * stream = dynamic_cast<IProfilingBlockInputStream *>(&*streams[0]))
{
stream->setProgressCallback(context.getProgressCallback());
stream->setProcessListElement(context.getProcessListElement());
/// Ограничения действуют только на конечный результат.
if (to_stage == QueryProcessingStage::Complete)
{
@ -365,8 +363,10 @@ const BlockInputStreams & InterpreterSelectQuery::executeWithoutUnion()
streams.insert(streams.end(), others.begin(), others.end());
}
for (auto & stream : streams)
transformStreams([&](auto & stream)
{
stream = new MaterializingBlockInputStream(stream);
});
}
else
executeSingleQuery();
@ -391,18 +391,19 @@ void InterpreterSelectQuery::executeSingleQuery()
union_within_single_query = false;
/** Вынем данные из Storage. from_stage - до какой стадии запрос был выполнен в Storage. */
QueryProcessingStage::Enum from_stage = executeFetchColumns(streams);
QueryProcessingStage::Enum from_stage = executeFetchColumns();
LOG_TRACE(log, QueryProcessingStage::toString(from_stage) << " -> " << QueryProcessingStage::toString(to_stage));
if (to_stage > QueryProcessingStage::FetchColumns)
{
bool has_join = false;
bool has_where = false;
bool need_aggregate = false;
bool has_having = false;
bool has_order_by = false;
ExpressionActionsPtr array_join;
ExpressionActionsPtr before_join;
ExpressionActionsPtr before_where;
ExpressionActionsPtr before_aggregation;
ExpressionActionsPtr before_having;
@ -430,7 +431,17 @@ void InterpreterSelectQuery::executeSingleQuery()
need_aggregate = query_analyzer->hasAggregation();
query_analyzer->appendArrayJoin(chain, !first_stage);
query_analyzer->appendJoin(chain, !first_stage);
if (query_analyzer->appendJoin(chain, !first_stage))
{
has_join = true;
before_join = chain.getLastActions();
chain.addStep();
auto join = typeid_cast<const ASTJoin &>(*query.join);
if (join.kind == ASTJoin::Full || join.kind == ASTJoin::Right)
stream_with_non_joined_data = before_join->createStreamWithNonJoinedDataIfFullOrRightJoin(settings.max_block_size);
}
if (query_analyzer->appendWhere(chain, !first_stage))
{
@ -475,7 +486,7 @@ void InterpreterSelectQuery::executeSingleQuery()
* чтобы запрос был проанализирован, и в нём могли бы быть обнаружены ошибки (например, несоответствия типов).
* Иначе мог бы вернуться пустой результат на некорректный запрос.
*/
if (streams.empty())
if (hasNoData())
return;
/// Перед выполнением HAVING уберем из блока лишние столбцы (в основном, ключи агрегации).
@ -500,15 +511,19 @@ void InterpreterSelectQuery::executeSingleQuery()
if (first_stage)
{
if (has_join)
for (auto & stream : streams) /// Применяем ко всем источникам кроме stream_with_non_joined_data.
stream = new ExpressionBlockInputStream(stream, before_join);
if (has_where)
executeWhere(streams, before_where);
executeWhere(before_where);
if (need_aggregate)
executeAggregation(streams, before_aggregation, aggregate_overflow_row, aggregate_final);
executeAggregation(before_aggregation, aggregate_overflow_row, aggregate_final);
else
{
executeExpression(streams, before_order_and_select);
executeDistinct(streams, true, selected_columns);
executeExpression(before_order_and_select);
executeDistinct(true, selected_columns);
}
/** При распределённой обработке запроса,
@ -520,13 +535,13 @@ void InterpreterSelectQuery::executeSingleQuery()
&& !need_aggregate && !has_having)
{
if (has_order_by)
executeOrder(streams);
executeOrder();
if (has_order_by && query.limit_length)
executeDistinct(streams, false, selected_columns);
executeDistinct(false, selected_columns);
if (query.limit_length)
executePreLimit(streams);
executePreLimit();
}
}
@ -538,21 +553,21 @@ void InterpreterSelectQuery::executeSingleQuery()
{
/// Если нужно объединить агрегированные результаты с нескольких серверов
if (!first_stage)
executeMergeAggregated(streams, aggregate_overflow_row, aggregate_final);
executeMergeAggregated(aggregate_overflow_row, aggregate_final);
if (!aggregate_final)
executeTotalsAndHaving(streams, has_having, before_having, aggregate_overflow_row);
executeTotalsAndHaving(has_having, before_having, aggregate_overflow_row);
else if (has_having)
executeHaving(streams, before_having);
executeHaving(before_having);
executeExpression(streams, before_order_and_select);
executeDistinct(streams, true, selected_columns);
executeExpression(before_order_and_select);
executeDistinct(true, selected_columns);
need_second_distinct_pass = query.distinct && (streams.size() > 1);
need_second_distinct_pass = query.distinct && hasMoreThanOneStream();
}
else if (query.group_by_with_totals && !aggregate_final)
{
executeTotalsAndHaving(streams, false, nullptr, aggregate_overflow_row);
executeTotalsAndHaving(false, nullptr, aggregate_overflow_row);
}
if (has_order_by)
@ -562,49 +577,53 @@ void InterpreterSelectQuery::executeSingleQuery()
* - поэтому, делаем merge сортированных потоков с удалённых серверов.
*/
if (!first_stage && !need_aggregate && !(query.group_by_with_totals && !aggregate_final))
executeMergeSorted(streams);
executeMergeSorted();
else /// Иначе просто сортировка.
executeOrder(streams);
executeOrder();
}
executeProjection(streams, final_projection);
executeProjection(final_projection);
/// На этой стадии можно считать минимумы и максимумы, если надо.
if (settings.extremes)
for (auto & stream : streams)
{
transformStreams([&](auto & stream)
{
if (IProfilingBlockInputStream * p_stream = dynamic_cast<IProfilingBlockInputStream *>(&*stream))
p_stream->enableExtremes();
});
}
/** Оптимизация - если источников несколько и есть LIMIT, то сначала применим предварительный LIMIT,
* ограничивающий число записей в каждом до offset + limit.
*/
if (query.limit_length && streams.size() > 1 && !query.distinct)
executePreLimit(streams);
if (query.limit_length && hasMoreThanOneStream() && !query.distinct)
executePreLimit();
if (need_second_distinct_pass)
union_within_single_query = true;
if (union_within_single_query)
executeUnion(streams);
if (union_within_single_query || stream_with_non_joined_data)
executeUnion();
if (streams.size() == 1)
{
/// Если было более одного источника - то нужно выполнить DISTINCT ещё раз после их слияния.
if (need_second_distinct_pass)
executeDistinct(streams, false, Names());
executeDistinct(false, Names());
executeLimit(streams);
executeLimit();
}
}
}
/** Если данных нет. */
if (streams.empty())
if (hasNoData())
return;
SubqueriesForSets subqueries_for_sets = query_analyzer->getSubqueriesForSets();
if (!subqueries_for_sets.empty())
executeSubqueriesInSetsAndJoins(streams, subqueries_for_sets);
executeSubqueriesInSetsAndJoins(subqueries_for_sets);
}
@ -620,9 +639,9 @@ static void getLimitLengthAndOffset(ASTSelectQuery & query, size_t & length, siz
}
}
QueryProcessingStage::Enum InterpreterSelectQuery::executeFetchColumns(BlockInputStreams & streams)
QueryProcessingStage::Enum InterpreterSelectQuery::executeFetchColumns()
{
if (!streams.empty())
if (!hasNoData())
return QueryProcessingStage::FetchColumns;
/// Интерпретатор подзапроса, если подзапрос
@ -741,8 +760,10 @@ QueryProcessingStage::Enum InterpreterSelectQuery::executeFetchColumns(BlockInpu
context, settings_for_storage, from_stage,
settings.max_block_size, max_streams);
for (auto & stream : streams)
transformStreams([&](auto & stream)
{
stream->addTableLock(table_lock);
});
}
else
{
@ -768,38 +789,36 @@ QueryProcessingStage::Enum InterpreterSelectQuery::executeFetchColumns(BlockInpu
QuotaForIntervals & quota = context.getQuota();
for (auto & stream : streams)
transformStreams([&](auto & stream)
{
if (IProfilingBlockInputStream * p_stream = dynamic_cast<IProfilingBlockInputStream *>(&*stream))
{
p_stream->setLimits(limits);
p_stream->setQuota(quota);
}
}
});
}
return from_stage;
}
void InterpreterSelectQuery::executeWhere(BlockInputStreams & streams, ExpressionActionsPtr expression)
void InterpreterSelectQuery::executeWhere(ExpressionActionsPtr expression)
{
for (auto & stream : streams)
transformStreams([&](auto & stream)
{
stream = new ExpressionBlockInputStream(stream, expression);
stream = new FilterBlockInputStream(stream, query.where_expression->getColumnName());
}
});
}
void InterpreterSelectQuery::executeAggregation(BlockInputStreams & streams, ExpressionActionsPtr expression, bool overflow_row, bool final)
void InterpreterSelectQuery::executeAggregation(ExpressionActionsPtr expression, bool overflow_row, bool final)
{
for (auto & stream : streams)
transformStreams([&](auto & stream)
{
stream = new ExpressionBlockInputStream(stream, expression);
}
BlockInputStreamPtr & stream = streams[0];
});
Names key_names;
AggregateDescriptions aggregates;
@ -808,23 +827,37 @@ void InterpreterSelectQuery::executeAggregation(BlockInputStreams & streams, Exp
/// Если источников несколько, то выполняем параллельную агрегацию
if (streams.size() > 1)
{
stream = new ParallelAggregatingBlockInputStream(streams, key_names, aggregates, overflow_row, final,
streams[0] = new ParallelAggregatingBlockInputStream(streams, stream_with_non_joined_data, key_names, aggregates, overflow_row, final,
settings.max_threads, settings.limits.max_rows_to_group_by, settings.limits.group_by_overflow_mode,
settings.compile ? &context.getCompiler() : nullptr, settings.min_count_to_compile, settings.group_by_two_level_threshold);
stream_with_non_joined_data = nullptr;
streams.resize(1);
}
else
stream = new AggregatingBlockInputStream(stream, key_names, aggregates, overflow_row, final,
{
BlockInputStreams inputs;
if (!streams.empty())
inputs.push_back(streams[0]);
else
streams.resize(1);
if (stream_with_non_joined_data)
inputs.push_back(stream_with_non_joined_data);
streams[0] = new AggregatingBlockInputStream(new ConcatBlockInputStream(inputs), key_names, aggregates, overflow_row, final,
settings.limits.max_rows_to_group_by, settings.limits.group_by_overflow_mode,
settings.compile ? &context.getCompiler() : nullptr, settings.min_count_to_compile, 0);
stream_with_non_joined_data = nullptr;
}
}
void InterpreterSelectQuery::executeMergeAggregated(BlockInputStreams & streams, bool overflow_row, bool final)
void InterpreterSelectQuery::executeMergeAggregated(bool overflow_row, bool final)
{
/// Склеим несколько источников в один
executeUnion(streams);
executeUnion();
/// Теперь объединим агрегированные блоки
Names key_names;
@ -834,20 +867,19 @@ void InterpreterSelectQuery::executeMergeAggregated(BlockInputStreams & streams,
}
void InterpreterSelectQuery::executeHaving(BlockInputStreams & streams, ExpressionActionsPtr expression)
void InterpreterSelectQuery::executeHaving(ExpressionActionsPtr expression)
{
for (auto & stream : streams)
transformStreams([&](auto & stream)
{
stream = new ExpressionBlockInputStream(stream, expression);
stream = new FilterBlockInputStream(stream, query.having_expression->getColumnName());
}
});
}
void InterpreterSelectQuery::executeTotalsAndHaving(BlockInputStreams & streams, bool has_having,
ExpressionActionsPtr expression, bool overflow_row)
void InterpreterSelectQuery::executeTotalsAndHaving(bool has_having, ExpressionActionsPtr expression, bool overflow_row)
{
executeUnion(streams);
executeUnion();
Names key_names;
AggregateDescriptions aggregates;
@ -858,12 +890,12 @@ void InterpreterSelectQuery::executeTotalsAndHaving(BlockInputStreams & streams,
}
void InterpreterSelectQuery::executeExpression(BlockInputStreams & streams, ExpressionActionsPtr expression)
void InterpreterSelectQuery::executeExpression(ExpressionActionsPtr expression)
{
for (auto & stream : streams)
transformStreams([&](auto & stream)
{
stream = new ExpressionBlockInputStream(stream, expression);
}
});
}
@ -898,12 +930,12 @@ static size_t getLimitForSorting(ASTSelectQuery & query)
}
void InterpreterSelectQuery::executeOrder(BlockInputStreams & streams)
void InterpreterSelectQuery::executeOrder()
{
SortDescription order_descr = getSortDescription(query);
size_t limit = getLimitForSorting(query);
for (auto & stream : streams)
transformStreams([&](auto & stream)
{
IProfilingBlockInputStream * sorting_stream = new PartialSortingBlockInputStream(stream, order_descr, limit);
@ -916,53 +948,51 @@ void InterpreterSelectQuery::executeOrder(BlockInputStreams & streams)
sorting_stream->setLimits(limits);
stream = sorting_stream;
}
BlockInputStreamPtr & stream = streams[0];
});
/// Если потоков несколько, то объединяем их в один
executeUnion(streams);
executeUnion();
/// Сливаем сортированные блоки.
stream = new MergeSortingBlockInputStream(
stream, order_descr, settings.max_block_size, limit,
streams[0] = new MergeSortingBlockInputStream(
streams[0], order_descr, settings.max_block_size, limit,
settings.limits.max_bytes_before_external_sort, context.getTemporaryPath());
}
void InterpreterSelectQuery::executeMergeSorted(BlockInputStreams & streams)
void InterpreterSelectQuery::executeMergeSorted()
{
SortDescription order_descr = getSortDescription(query);
size_t limit = getLimitForSorting(query);
BlockInputStreamPtr & stream = streams[0];
/// Если потоков несколько, то объединяем их в один
if (streams.size() > 1)
if (hasMoreThanOneStream())
{
/** MergingSortedBlockInputStream читает источники последовательно.
* Чтобы данные на удалённых серверах готовились параллельно, оборачиваем в AsynchronousBlockInputStream.
*/
for (auto & stream : streams)
transformStreams([&](auto & stream)
{
stream = new AsynchronousBlockInputStream(stream);
});
/// Сливаем сортированные источники в один сортированный источник.
stream = new MergingSortedBlockInputStream(streams, order_descr, settings.max_block_size, limit);
streams[0] = new MergingSortedBlockInputStream(streams, order_descr, settings.max_block_size, limit);
streams.resize(1);
}
}
void InterpreterSelectQuery::executeProjection(BlockInputStreams & streams, ExpressionActionsPtr expression)
void InterpreterSelectQuery::executeProjection(ExpressionActionsPtr expression)
{
for (auto & stream : streams)
transformStreams([&](auto & stream)
{
stream = new ExpressionBlockInputStream(stream, expression);
}
});
}
void InterpreterSelectQuery::executeDistinct(BlockInputStreams & streams, bool before_order, Names columns)
void InterpreterSelectQuery::executeDistinct(bool before_order, Names columns)
{
if (query.distinct)
{
@ -976,31 +1006,38 @@ void InterpreterSelectQuery::executeDistinct(BlockInputStreams & streams, bool b
if (!query.order_expression_list || !before_order)
limit_for_distinct = limit_length + limit_offset;
for (auto & stream : streams)
transformStreams([&](auto & stream)
{
stream = new DistinctBlockInputStream(stream, settings.limits, limit_for_distinct, columns);
}
});
if (streams.size() > 1)
if (hasMoreThanOneStream())
union_within_single_query = true;
}
}
void InterpreterSelectQuery::executeUnion(BlockInputStreams & streams)
void InterpreterSelectQuery::executeUnion()
{
/// Если до сих пор есть несколько потоков, то объединяем их в один
if (streams.size() > 1)
if (hasMoreThanOneStream())
{
streams[0] = new UnionBlockInputStream(streams, settings.max_threads);
streams[0] = new UnionBlockInputStream(streams, stream_with_non_joined_data, settings.max_threads);
stream_with_non_joined_data = nullptr;
streams.resize(1);
union_within_single_query = false;
}
else if (stream_with_non_joined_data)
{
streams.push_back(stream_with_non_joined_data);
stream_with_non_joined_data = nullptr;
union_within_single_query = false;
}
}
/// Предварительный LIMIT - применяется в каждом источнике, если источников несколько, до их объединения.
void InterpreterSelectQuery::executePreLimit(BlockInputStreams & streams)
void InterpreterSelectQuery::executePreLimit()
{
size_t limit_length = 0;
size_t limit_offset = 0;
@ -1009,18 +1046,18 @@ void InterpreterSelectQuery::executePreLimit(BlockInputStreams & streams)
/// Если есть LIMIT
if (query.limit_length)
{
for (auto & stream : streams)
transformStreams([&](auto & stream)
{
stream = new LimitBlockInputStream(stream, limit_length + limit_offset, 0);
}
});
if (streams.size() > 1)
if (hasMoreThanOneStream())
union_within_single_query = true;
}
}
void InterpreterSelectQuery::executeLimit(BlockInputStreams & streams)
void InterpreterSelectQuery::executeLimit()
{
size_t limit_length = 0;
size_t limit_offset = 0;
@ -1029,20 +1066,22 @@ void InterpreterSelectQuery::executeLimit(BlockInputStreams & streams)
/// Если есть LIMIT
if (query.limit_length)
{
BlockInputStreamPtr & stream = streams[0];
stream = new LimitBlockInputStream(stream, limit_length, limit_offset);
transformStreams([&](auto & stream)
{
stream = new LimitBlockInputStream(stream, limit_length, limit_offset);
});
}
}
void InterpreterSelectQuery::executeSubqueriesInSetsAndJoins(BlockInputStreams & streams, SubqueriesForSets & subqueries_for_sets)
void InterpreterSelectQuery::executeSubqueriesInSetsAndJoins(SubqueriesForSets & subqueries_for_sets)
{
/// Если запрос не распределённый, то удалим создание временных таблиц из подзапросов (предназначавшихся для отправки на удалённые серверы).
if (!(storage && storage->isRemote()))
for (auto & elem : subqueries_for_sets)
elem.second.table.reset();
executeUnion(streams);
executeUnion();
streams[0] = new CreatingSetsBlockInputStream(streams[0], subqueries_for_sets, settings.limits);
}

View File

@ -4,6 +4,10 @@
#include <DB/Parsers/ASTJoin.h>
#include <DB/Interpreters/Join.h>
#include <DB/DataStreams/IProfilingBlockInputStream.h>
#include <DB/Core/ColumnNumbers.h>
/*#include <DB/DataStreams/TabSeparatedBlockOutputStream.h>
*#include <DB/IO/WriteBufferFromFileDescriptor.h>*/
namespace DB
@ -18,6 +22,9 @@ Join::Type Join::chooseMethod(const ConstColumnPlainPtrs & key_columns, bool & k
size_t keys_bytes = 0;
key_sizes.resize(keys_size);
if (keys_size == 0)
return Type::CROSS;
for (size_t j = 0; j < keys_size; ++j)
{
if (!key_columns[j]->isFixed())
@ -57,6 +64,7 @@ static void initImpl(Maps & maps, Join::Type type)
case Join::Type::KEY_64: maps.key64 .reset(new typename Maps::MapUInt64); break;
case Join::Type::KEY_STRING: maps.key_string .reset(new typename Maps::MapString); break;
case Join::Type::HASHED: maps.hashed .reset(new typename Maps::MapHashed); break;
case Join::Type::CROSS: break;
default:
throw Exception("Unknown JOIN keys variant.", ErrorCodes::UNKNOWN_AGGREGATED_DATA_VARIANT);
@ -101,6 +109,9 @@ void Join::init(Type type_)
{
type = type_;
if (kind == ASTJoin::Cross)
return;
if (!getFullness(kind))
{
if (strictness == ASTJoin::Any)
@ -120,21 +131,41 @@ void Join::init(Type type_)
size_t Join::getTotalRowCount() const
{
size_t res = 0;
res += getTotalRowCountImpl(maps_any);
res += getTotalRowCountImpl(maps_all);
res += getTotalRowCountImpl(maps_any_full);
res += getTotalRowCountImpl(maps_all_full);
if (type == Type::CROSS)
{
for (const auto & block : blocks)
res += block.rowsInFirstColumn();
}
else
{
res += getTotalRowCountImpl(maps_any);
res += getTotalRowCountImpl(maps_all);
res += getTotalRowCountImpl(maps_any_full);
res += getTotalRowCountImpl(maps_all_full);
}
return res;
}
size_t Join::getTotalByteCount() const
{
size_t res = 0;
res += getTotalByteCountImpl(maps_any);
res += getTotalByteCountImpl(maps_all);
res += getTotalByteCountImpl(maps_any_full);
res += getTotalByteCountImpl(maps_all_full);
res += pool.size();
if (type == Type::CROSS)
{
for (const auto & block : blocks)
res += block.bytes();
}
else
{
res += getTotalByteCountImpl(maps_any);
res += getTotalByteCountImpl(maps_all);
res += getTotalByteCountImpl(maps_any_full);
res += getTotalByteCountImpl(maps_all_full);
res += pool.size();
}
return res;
}
@ -254,7 +285,11 @@ template <> struct Inserter<ASTJoin::All, Join::MapsAllFull::MapString> : Insert
template <ASTJoin::Strictness STRICTNESS, typename Maps>
void Join::insertFromBlockImpl(Maps & maps, size_t rows, const ConstColumnPlainPtrs & key_columns, size_t keys_size, Block * stored_block)
{
if (type == Type::KEY_64)
if (type == Type::CROSS)
{
/// Ничего не делаем. Уже сохранили блок, и этого достаточно.
}
else if (type == Type::KEY_64)
{
typedef typename Maps::MapUInt64 Map;
Map & res = *maps.key64;
@ -377,9 +412,9 @@ bool Join::insertFromBlock(const Block & block)
{
key_columns[i] = block.getByName(key_names_right[i]).column;
if (key_columns[i]->isConst())
if (auto * col_const = dynamic_cast<const IColumnConst *>(key_columns[i]))
{
materialized_columns.emplace_back(dynamic_cast<const IColumnConst &>(*key_columns[i]).convertToFullColumn());
materialized_columns.emplace_back(col_const->convertToFullColumn());
key_columns[i] = materialized_columns.back();
}
}
@ -389,31 +424,51 @@ bool Join::insertFromBlock(const Block & block)
blocks.push_back(block);
Block * stored_block = &blocks.back();
/// Удаляем из stored_block ключевые столбцы, так как они не нужны.
for (const auto & name : key_names_right)
stored_block->erase(stored_block->getPositionByName(name));
if (getFullness(kind))
{
/// Переносим ключевые столбцы в начало блока.
size_t key_num = 0;
for (const auto & name : key_names_right)
{
size_t pos = stored_block->getPositionByName(name);
ColumnWithTypeAndName col = stored_block->getByPosition(pos);
stored_block->erase(pos);
stored_block->insert(key_num, col);
++key_num;
}
}
else
{
/// Удаляем из stored_block ключевые столбцы, так как они не нужны.
for (const auto & name : key_names_right)
stored_block->erase(stored_block->getPositionByName(name));
}
/// Редкий случай, когда соединяемые столбцы являются константами. Чтобы не поддерживать отдельный код, материализуем их.
for (size_t i = 0, size = stored_block->columns(); i < size; ++i)
{
ColumnPtr col = stored_block->getByPosition(i).column;
if (col->isConst())
stored_block->getByPosition(i).column = dynamic_cast<IColumnConst &>(*col).convertToFullColumn();
if (auto * col_const = dynamic_cast<const IColumnConst *>(col.get()))
stored_block->getByPosition(i).column = col_const->convertToFullColumn();
}
if (!getFullness(kind))
if (kind != ASTJoin::Cross)
{
if (strictness == ASTJoin::Any)
insertFromBlockImpl<ASTJoin::Any>(maps_any, rows, key_columns, keys_size, stored_block);
/// Заполняем нужную хэш-таблицу.
if (!getFullness(kind))
{
if (strictness == ASTJoin::Any)
insertFromBlockImpl<ASTJoin::Any>(maps_any, rows, key_columns, keys_size, stored_block);
else
insertFromBlockImpl<ASTJoin::All>(maps_all, rows, key_columns, keys_size, stored_block);
}
else
insertFromBlockImpl<ASTJoin::All>(maps_all, rows, key_columns, keys_size, stored_block);
}
else
{
if (strictness == ASTJoin::Any)
insertFromBlockImpl<ASTJoin::Any>(maps_any_full, rows, key_columns, keys_size, stored_block);
else
insertFromBlockImpl<ASTJoin::All>(maps_all_full, rows, key_columns, keys_size, stored_block);
{
if (strictness == ASTJoin::Any)
insertFromBlockImpl<ASTJoin::Any>(maps_any_full, rows, key_columns, keys_size, stored_block);
else
insertFromBlockImpl<ASTJoin::All>(maps_all_full, rows, key_columns, keys_size, stored_block);
}
}
if (!checkSizeLimits())
@ -443,7 +498,8 @@ template <typename Map>
struct Adder<ASTJoin::Left, ASTJoin::Any, Map>
{
static void add(const Map & map, const typename Map::key_type & key, size_t num_columns_to_add, ColumnPlainPtrs & added_columns,
size_t i, IColumn::Filter * filter, IColumn::Offset_t & current_offset, IColumn::Offsets_t * offsets)
size_t i, IColumn::Filter * filter, IColumn::Offset_t & current_offset, IColumn::Offsets_t * offsets,
size_t num_columns_to_skip)
{
typename Map::const_iterator it = map.find(key);
@ -451,7 +507,7 @@ struct Adder<ASTJoin::Left, ASTJoin::Any, Map>
{
it->second.setUsed();
for (size_t j = 0; j < num_columns_to_add; ++j)
added_columns[j]->insertFrom(*it->second.block->unsafeGetByPosition(j).column.get(), it->second.row_num);
added_columns[j]->insertFrom(*it->second.block->unsafeGetByPosition(num_columns_to_skip + j).column.get(), it->second.row_num);
}
else
{
@ -465,7 +521,8 @@ template <typename Map>
struct Adder<ASTJoin::Inner, ASTJoin::Any, Map>
{
static void add(const Map & map, const typename Map::key_type & key, size_t num_columns_to_add, ColumnPlainPtrs & added_columns,
size_t i, IColumn::Filter * filter, IColumn::Offset_t & current_offset, IColumn::Offsets_t * offsets)
size_t i, IColumn::Filter * filter, IColumn::Offset_t & current_offset, IColumn::Offsets_t * offsets,
size_t num_columns_to_skip)
{
typename Map::const_iterator it = map.find(key);
@ -475,7 +532,7 @@ struct Adder<ASTJoin::Inner, ASTJoin::Any, Map>
it->second.setUsed();
for (size_t j = 0; j < num_columns_to_add; ++j)
added_columns[j]->insertFrom(*it->second.block->unsafeGetByPosition(j).column.get(), it->second.row_num);
added_columns[j]->insertFrom(*it->second.block->unsafeGetByPosition(num_columns_to_skip + j).column.get(), it->second.row_num);
}
else
(*filter)[i] = 0;
@ -486,7 +543,8 @@ template <ASTJoin::Kind KIND, typename Map>
struct Adder<KIND, ASTJoin::All, Map>
{
static void add(const Map & map, const typename Map::key_type & key, size_t num_columns_to_add, ColumnPlainPtrs & added_columns,
size_t i, IColumn::Filter * filter, IColumn::Offset_t & current_offset, IColumn::Offsets_t * offsets)
size_t i, IColumn::Filter * filter, IColumn::Offset_t & current_offset, IColumn::Offsets_t * offsets,
size_t num_columns_to_skip)
{
typename Map::const_iterator it = map.find(key);
@ -497,7 +555,7 @@ struct Adder<KIND, ASTJoin::All, Map>
for (auto current = &static_cast<const typename Map::mapped_type::Base_t &>(it->second); current != nullptr; current = current->next)
{
for (size_t j = 0; j < num_columns_to_add; ++j)
added_columns[j]->insertFrom(*current->block->unsafeGetByPosition(j).column.get(), current->row_num);
added_columns[j]->insertFrom(*current->block->unsafeGetByPosition(num_columns_to_skip + j).column.get(), current->row_num);
++rows_joined;
}
@ -538,19 +596,34 @@ void Join::joinBlockImpl(Block & block, const Maps & maps) const
{
key_columns[i] = block.getByName(key_names_left[i]).column;
if (key_columns[i]->isConst())
if (auto * col_const = dynamic_cast<const IColumnConst *>(key_columns[i]))
{
materialized_columns.emplace_back(dynamic_cast<const IColumnConst &>(*key_columns[i]).convertToFullColumn());
materialized_columns.emplace_back(col_const->convertToFullColumn());
key_columns[i] = materialized_columns.back();
}
}
size_t existing_columns = block.columns();
/** Если используется FULL или RIGHT JOIN, то столбцы из "левой" части надо материализовать.
* Потому что, если они константы, то в "неприсоединённых" строчках, у них могут быть другие значения
* - значения по-умолчанию, которые могут отличаться от значений этих констант.
*/
if (getFullness(kind))
{
for (size_t i = 0; i < existing_columns; ++i)
{
auto & col = block.getByPosition(i).column;
if (auto * col_const = dynamic_cast<IColumnConst *>(col.get()))
col = col_const->convertToFullColumn();
}
}
/// Добавляем в блок новые столбцы.
size_t num_columns_to_add = sample_block_with_columns_to_add.columns();
ColumnPlainPtrs added_columns(num_columns_to_add);
size_t existing_columns = block.columns();
for (size_t i = 0; i < num_columns_to_add; ++i)
{
const ColumnWithTypeAndName & src_column = sample_block_with_columns_to_add.getByPosition(i);
@ -575,6 +648,16 @@ void Join::joinBlockImpl(Block & block, const Maps & maps) const
if (strictness == ASTJoin::All)
offsets_to_replicate.reset(new IColumn::Offsets_t(rows));
/** Для LEFT/INNER JOIN, сохранённые блоки не содержат ключи.
* Для FULL/RIGHT JOIN, сохранённые блоки содержат ключи;
* но они не будут использоваться на этой стадии соединения (а будут в AdderNonJoined), и их нужно пропустить.
*/
size_t num_columns_to_skip = 0;
if (getFullness(kind))
num_columns_to_skip = keys_size;
// std::cerr << num_columns_to_skip << "\n" << block.dumpStructure() << "\n" << blocks.front().dumpStructure() << "\n";
if (type == Type::KEY_64)
{
typedef typename Maps::MapUInt64 Map;
@ -586,7 +669,8 @@ void Join::joinBlockImpl(Block & block, const Maps & maps) const
{
/// Строим ключ
UInt64 key = column.get64(i);
Adder<KIND, STRICTNESS, Map>::add(map, key, num_columns_to_add, added_columns, i, filter.get(), current_offset, offsets_to_replicate.get());
Adder<KIND, STRICTNESS, Map>::add(
map, key, num_columns_to_add, added_columns, i, filter.get(), current_offset, offsets_to_replicate.get(), num_columns_to_skip);
}
}
else if (type == Type::KEY_STRING)
@ -605,7 +689,8 @@ void Join::joinBlockImpl(Block & block, const Maps & maps) const
{
/// Строим ключ
StringRef key(&data[i == 0 ? 0 : offsets[i - 1]], (i == 0 ? offsets[i] : (offsets[i] - offsets[i - 1])) - 1);
Adder<KIND, STRICTNESS, Map>::add(map, key, num_columns_to_add, added_columns, i, filter.get(), current_offset, offsets_to_replicate.get());
Adder<KIND, STRICTNESS, Map>::add(
map, key, num_columns_to_add, added_columns, i, filter.get(), current_offset, offsets_to_replicate.get(), num_columns_to_skip);
}
}
else if (const ColumnFixedString * column_string = typeid_cast<const ColumnFixedString *>(&column))
@ -618,7 +703,8 @@ void Join::joinBlockImpl(Block & block, const Maps & maps) const
{
/// Строим ключ
StringRef key(&data[i * n], n);
Adder<KIND, STRICTNESS, Map>::add(map, key, num_columns_to_add, added_columns, i, filter.get(), current_offset, offsets_to_replicate.get());
Adder<KIND, STRICTNESS, Map>::add(
map, key, num_columns_to_add, added_columns, i, filter.get(), current_offset, offsets_to_replicate.get(), num_columns_to_skip);
}
}
else
@ -636,7 +722,8 @@ void Join::joinBlockImpl(Block & block, const Maps & maps) const
? packFixed<UInt128>(i, keys_size, key_columns, key_sizes)
: hash128(i, keys_size, key_columns);
Adder<KIND, STRICTNESS, Map>::add(map, key, num_columns_to_add, added_columns, i, filter.get(), current_offset, offsets_to_replicate.get());
Adder<KIND, STRICTNESS, Map>::add(
map, key, num_columns_to_add, added_columns, i, filter.get(), current_offset, offsets_to_replicate.get(), num_columns_to_skip);
}
}
else
@ -654,6 +741,60 @@ void Join::joinBlockImpl(Block & block, const Maps & maps) const
}
void Join::joinBlockImplCross(Block & block) const
{
Block res = block.cloneEmpty();
/// Добавляем в блок новые столбцы.
size_t num_existing_columns = res.columns();
size_t num_columns_to_add = sample_block_with_columns_to_add.columns();
ColumnPlainPtrs src_left_columns(num_existing_columns);
ColumnPlainPtrs dst_left_columns(num_existing_columns);
ColumnPlainPtrs dst_right_columns(num_columns_to_add);
for (size_t i = 0; i < num_existing_columns; ++i)
{
src_left_columns[i] = block.unsafeGetByPosition(i).column;
dst_left_columns[i] = res.unsafeGetByPosition(i).column;
}
for (size_t i = 0; i < num_columns_to_add; ++i)
{
const ColumnWithTypeAndName & src_column = sample_block_with_columns_to_add.unsafeGetByPosition(i);
ColumnWithTypeAndName new_column = src_column.cloneEmpty();
res.insert(new_column);
dst_right_columns[i] = new_column.column;
}
size_t rows_left = block.rowsInFirstColumn();
/// NOTE Было бы оптимальнее использовать reserve, а также методы replicate для размножения значений левого блока.
for (size_t i = 0; i < rows_left; ++i)
{
for (const Block & block_right : blocks)
{
size_t rows_right = block_right.rowsInFirstColumn();
for (size_t col_num = 0; col_num < num_existing_columns; ++col_num)
for (size_t j = 0; j < rows_right; ++j)
dst_left_columns[col_num]->insertFrom(*src_left_columns[col_num], i);
for (size_t col_num = 0; col_num < num_columns_to_add; ++col_num)
{
const IColumn * column_right = block_right.unsafeGetByPosition(col_num).column;
for (size_t j = 0; j < rows_right; ++j)
dst_right_columns[col_num]->insertFrom(*column_right, j);
}
}
}
block = res;
}
void Join::checkTypesOfKeys(const Block & block_left, const Block & block_right) const
{
size_t keys_size = key_names_left.size();
@ -689,6 +830,10 @@ void Join::joinBlock(Block & block) const
joinBlockImpl<ASTJoin::Left, ASTJoin::All>(block, maps_all_full);
else if (kind == ASTJoin::Right && strictness == ASTJoin::All)
joinBlockImpl<ASTJoin::Inner, ASTJoin::All>(block, maps_all_full);
else if (kind == ASTJoin::Cross)
joinBlockImplCross(block);
else
throw Exception("Logical error: unknown combination of JOIN", ErrorCodes::LOGICAL_ERROR);
}
@ -759,9 +904,45 @@ struct AdderNonJoined<ASTJoin::All, Mapped>
class NonJoinedBlockInputStream : public IProfilingBlockInputStream
{
public:
NonJoinedBlockInputStream(const Join & parent_, Block & left_sample_block_, size_t max_block_size_)
: parent(parent_), left_sample_block(left_sample_block_), max_block_size(max_block_size_)
NonJoinedBlockInputStream(const Join & parent_, Block & left_sample_block, size_t max_block_size_)
: parent(parent_), max_block_size(max_block_size_)
{
/** left_sample_block содержит ключи и "левые" столбцы.
* result_sample_block - ключи, "левые" столбцы и "правые" столбцы.
*/
size_t num_keys = parent.key_names_left.size();
size_t num_columns_left = left_sample_block.columns() - num_keys;
size_t num_columns_right = parent.sample_block_with_columns_to_add.columns();
result_sample_block = left_sample_block;
/// Добавляем в блок новые столбцы.
for (size_t i = 0; i < num_columns_right; ++i)
{
const ColumnWithTypeAndName & src_column = parent.sample_block_with_columns_to_add.getByPosition(i);
ColumnWithTypeAndName new_column = src_column.cloneEmpty();
result_sample_block.insert(new_column);
}
column_numbers_left.reserve(num_columns_left);
column_numbers_keys_and_right.reserve(num_keys + num_columns_right);
for (size_t i = 0; i < num_keys + num_columns_left; ++i)
{
const String & name = left_sample_block.getByPosition(i).name;
if (parent.key_names_left.end() == std::find(parent.key_names_left.begin(), parent.key_names_left.end(), name))
column_numbers_left.push_back(i);
else
column_numbers_keys_and_right.push_back(i);
}
for (size_t i = 0; i < num_columns_right; ++i)
column_numbers_keys_and_right.push_back(num_keys + num_columns_left + i);
columns_left.resize(num_columns_left);
columns_keys_and_right.resize(num_keys + num_columns_right);
}
String getName() const override { return "NonJoined"; }
@ -790,56 +971,59 @@ protected:
private:
const Join & parent;
Block left_sample_block;
size_t max_block_size;
Block result_sample_block;
ColumnNumbers column_numbers_left;
ColumnNumbers column_numbers_keys_and_right;
ColumnPlainPtrs columns_left;
ColumnPlainPtrs columns_keys_and_right;
std::unique_ptr<void, std::function<void(void *)>> position; /// type erasure
template <ASTJoin::Strictness STRICTNESS, typename Maps>
Block createBlock(const Maps & maps)
{
Block block = left_sample_block.cloneEmpty();
Block block = result_sample_block.cloneEmpty();
size_t num_columns_left = left_sample_block.columns();
ColumnPlainPtrs columns_left(num_columns_left);
size_t num_columns_left = column_numbers_left.size();
size_t num_columns_right = column_numbers_keys_and_right.size();
for (size_t i = 0; i < num_columns_left; ++i)
{
auto & column_with_name_and_type = block.getByPosition(i);
auto & column_with_name_and_type = block.getByPosition(column_numbers_left[i]);
column_with_name_and_type.column = column_with_name_and_type.type->createColumn();
columns_left[i] = column_with_name_and_type.column.get();
}
/// Добавляем в блок новые столбцы.
size_t num_columns_right = parent.sample_block_with_columns_to_add.columns();
ColumnPlainPtrs columns_right(num_columns_right);
for (size_t i = 0; i < num_columns_right; ++i)
{
const ColumnWithTypeAndName & src_column = parent.sample_block_with_columns_to_add.getByPosition(i);
ColumnWithTypeAndName new_column = src_column.cloneEmpty();
block.insert(new_column);
columns_right[i] = new_column.column;
columns_right[i]->reserve(src_column.column->size());
auto & column_with_name_and_type = block.getByPosition(column_numbers_keys_and_right[i]);
column_with_name_and_type.column = column_with_name_and_type.type->createColumn();
columns_keys_and_right[i] = column_with_name_and_type.column.get();
columns_keys_and_right[i]->reserve(column_with_name_and_type.column->size());
}
size_t rows_added = 0;
if (parent.type == Join::Type::KEY_64)
rows_added = fillColumns<STRICTNESS>(*maps.key64, num_columns_left, columns_left, num_columns_right, columns_right);
rows_added = fillColumns<STRICTNESS>(*maps.key64, num_columns_left, columns_left, num_columns_right, columns_keys_and_right);
else if (parent.type == Join::Type::KEY_STRING)
rows_added = fillColumns<STRICTNESS>(*maps.key_string, num_columns_left, columns_left, num_columns_right, columns_right);
rows_added = fillColumns<STRICTNESS>(*maps.key_string, num_columns_left, columns_left, num_columns_right, columns_keys_and_right);
else if (parent.type == Join::Type::HASHED)
rows_added = fillColumns<STRICTNESS>(*maps.hashed, num_columns_left, columns_left, num_columns_right, columns_right);
rows_added = fillColumns<STRICTNESS>(*maps.hashed, num_columns_left, columns_left, num_columns_right, columns_keys_and_right);
else
throw Exception("Unknown JOIN variant.", ErrorCodes::UNKNOWN_SET_DATA_VARIANT);
std::cerr << "rows added: " << rows_added << "\n";
// std::cerr << "rows added: " << rows_added << "\n";
if (!rows_added)
return Block();
std::cerr << block.dumpStructure() << "\n";
/* std::cerr << block.dumpStructure() << "\n";
WriteBufferFromFileDescriptor wb(STDERR_FILENO);
TabSeparatedBlockOutputStream out(wb);
out.write(block);*/
return block;
}
@ -862,7 +1046,7 @@ private:
for (; it != end; ++it)
{
std::cerr << it->second.getUsed() << "\n";
// std::cerr << it->second.getUsed() << "\n";
if (it->second.getUsed())
continue;

View File

@ -166,6 +166,15 @@ static std::tuple<ASTPtr, BlockIO> executeQueryImpl(
/// Держим элемент списка процессов до конца обработки запроса.
res.process_list_entry = process_list_entry;
if (res.in)
{
if (IProfilingBlockInputStream * stream = dynamic_cast<IProfilingBlockInputStream *>(res.in.get()))
{
stream->setProgressCallback(context.getProgressCallback());
stream->setProcessListElement(context.getProcessListElement());
}
}
quota.addQuery(current_time);
/// Всё, что связано с логом запросов.

View File

@ -556,12 +556,8 @@ bool ParserWithOptionalAlias::parseImpl(Pos & pos, Pos end, ASTPtr & node, Pos &
{
String alias_name = typeid_cast<ASTIdentifier &>(*alias_node).name;
if (ASTFunction * func = typeid_cast<ASTFunction *>(&*node))
func->alias = alias_name;
else if (ASTIdentifier * ident = typeid_cast<ASTIdentifier *>(&*node))
ident->alias = alias_name;
else if (ASTLiteral * lit = typeid_cast<ASTLiteral *>(&*node))
lit->alias = alias_name;
if (ASTWithAlias * ast_with_alias = dynamic_cast<ASTWithAlias *>(node.get()))
ast_with_alias->alias = alias_name;
else
{
expected = "alias cannot be here";

View File

@ -24,12 +24,13 @@ bool ParserJoin::parseImpl(Pos & pos, Pos end, ASTPtr & node, Pos & max_parsed_p
ParserString s_left("LEFT", true, true);
ParserString s_right("RIGHT", true, true);
ParserString s_full("FULL", true, true);
ParserString s_cross("CROSS", true, true);
ParserString s_outer("OUTER", true, true);
ParserString s_join("JOIN", true, true);
ParserString s_using("USING", true, true);
ParserNotEmptyExpressionList exp_list;
ParserSubquery subquery;
ParserWithOptionalAlias subquery(ParserPtr(new ParserSubquery));
ParserIdentifier identifier;
ws.ignore(pos, end);
@ -41,15 +42,13 @@ bool ParserJoin::parseImpl(Pos & pos, Pos end, ASTPtr & node, Pos & max_parsed_p
ws.ignore(pos, end);
bool has_strictness = true;
if (s_any.ignore(pos, end))
join->strictness = ASTJoin::Any;
else if (s_all.ignore(pos, end))
join->strictness = ASTJoin::All;
else
{
expected = "ANY|ALL";
return false;
}
has_strictness = false;
ws.ignore(pos, end);
@ -61,16 +60,24 @@ bool ParserJoin::parseImpl(Pos & pos, Pos end, ASTPtr & node, Pos & max_parsed_p
join->kind = ASTJoin::Right;
else if (s_full.ignore(pos, end))
join->kind = ASTJoin::Full;
else if (s_cross.ignore(pos, end))
join->kind = ASTJoin::Cross;
else
{
expected = "INNER|LEFT|RIGHT|FULL";
expected = "INNER|LEFT|RIGHT|FULL|CROSS";
return false;
}
if (!has_strictness && join->kind != ASTJoin::Cross)
throw Exception("You must specify ANY or ALL for JOIN, before INNER or LEFT or RIGHT or FULL.", ErrorCodes::SYNTAX_ERROR);
if (has_strictness && join->kind == ASTJoin::Cross)
throw Exception("You must not specify ANY or ALL for CROSS JOIN.", ErrorCodes::SYNTAX_ERROR);
ws.ignore(pos, end);
/// Для всех JOIN-ов кроме INNER может присутствовать не обязательное слово "OUTER".
if (join->kind != ASTJoin::Inner && s_outer.ignore(pos, end))
/// Для всех JOIN-ов кроме INNER и CROSS может присутствовать не обязательное слово "OUTER".
if (join->kind != ASTJoin::Inner && join->kind != ASTJoin::Cross && s_outer.ignore(pos, end))
ws.ignore(pos, end);
if (!s_join.ignore(pos, end, max_parsed_pos, expected))
@ -84,22 +91,23 @@ bool ParserJoin::parseImpl(Pos & pos, Pos end, ASTPtr & node, Pos & max_parsed_p
ws.ignore(pos, end);
/// Может быть указан алиас. На данный момент, он ничего не значит и не используется.
ParserAlias().ignore(pos, end);
ws.ignore(pos, end);
if (join->kind != ASTJoin::Cross)
{
if (!s_using.ignore(pos, end, max_parsed_pos, expected))
return false;
if (!s_using.ignore(pos, end, max_parsed_pos, expected))
return false;
ws.ignore(pos, end);
ws.ignore(pos, end);
if (!exp_list.parse(pos, end, join->using_expr_list, max_parsed_pos, expected))
return false;
if (!exp_list.parse(pos, end, join->using_expr_list, max_parsed_pos, expected))
return false;
ws.ignore(pos, end);
ws.ignore(pos, end);
}
join->children.push_back(join->table);
join->children.push_back(join->using_expr_list);
if (join->using_expr_list)
join->children.push_back(join->using_expr_list);
return true;
}

View File

@ -23,6 +23,7 @@ bool ParserSelectQuery::parseImpl(Pos & pos, Pos end, ASTPtr & node, Pos & max_p
ParserString s_select("SELECT", true, true);
ParserString s_distinct("DISTINCT", true, true);
ParserString s_from("FROM", true, true);
ParserString s_left("LEFT", true, true);
ParserString s_array("ARRAY", true, true);
ParserString s_join("JOIN", true, true);
ParserString s_using("USING", true, true);
@ -166,8 +167,22 @@ bool ParserSelectQuery::parseImpl(Pos & pos, Pos end, ASTPtr & node, Pos & max_p
if (!parse_final_and_sample())
return false;
/// ARRAY JOIN expr list
if (s_array.ignore(pos, end, max_parsed_pos, expected))
/// [LEFT] ARRAY JOIN expr list
Pos saved_pos = pos;
bool has_array_join = false;
if (s_left.ignore(pos, end, max_parsed_pos, expected) && ws.ignore(pos, end) && s_array.ignore(pos, end, max_parsed_pos, expected))
{
select_query->array_join_is_left = true;
has_array_join = true;
}
else
{
pos = saved_pos;
if (s_array.ignore(pos, end, max_parsed_pos, expected))
has_array_join = true;
}
if (has_array_join)
{
ws.ignore(pos, end);
@ -182,7 +197,7 @@ bool ParserSelectQuery::parseImpl(Pos & pos, Pos end, ASTPtr & node, Pos & max_p
ws.ignore(pos, end);
}
/// [GLOBAL] ANY|ALL INNER|LEFT JOIN (subquery) USING tuple
/// [GLOBAL] [ANY|ALL] INNER|LEFT|RIGHT|FULL|CROSS [OUTER] JOIN (subquery)|table_name USING tuple
join.parse(pos, end, select_query->join, max_parsed_pos, expected);
if (!parse_final_and_sample())

View File

@ -63,12 +63,24 @@ String backQuoteIfNeed(const String & x)
}
String hightlight(const String & keyword, const String & color_sequence, const bool hilite)
static String hightlight(const String & keyword, const String & color_sequence, const bool hilite)
{
return hilite ? color_sequence + keyword + hilite_none : keyword;
}
static void writeAlias(const String & name, std::ostream & s, bool hilite, bool one_line)
{
s << (hilite ? hilite_keyword : "") << " AS " << (hilite ? hilite_alias : "");
WriteBufferFromOStream wb(s, 32);
writeProbablyBackQuotedString(name, wb);
wb.next();
s << (hilite ? hilite_none : "");
}
void formatAST(const ASTExpressionList & ast, std::ostream & s, size_t indent, bool hilite, bool one_line, bool need_parens)
{
for (ASTs::const_iterator it = ast.children.begin(); it != ast.children.end(); ++it)
@ -151,7 +163,9 @@ void formatAST(const ASTSelectQuery & ast, std::ostream & s, size_t indent, bo
if (ast.array_join_expression_list)
{
s << (hilite ? hilite_keyword : "") << nl_or_ws << indent_str << "ARRAY JOIN " << (hilite ? hilite_none : "");
s << (hilite ? hilite_keyword : "") << nl_or_ws << indent_str
<< (ast.array_join_is_left ? "LEFT " : "") << "ARRAY JOIN " << (hilite ? hilite_none : "");
one_line
? formatAST(*ast.array_join_expression_list, s, indent, hilite, one_line)
: formatExpressionListMultiline(typeid_cast<const ASTExpressionList &>(*ast.array_join_expression_list), s, indent, hilite);
@ -245,12 +259,23 @@ void formatAST(const ASTSelectQuery & ast, std::ostream & s, size_t indent, bo
void formatAST(const ASTSubquery & ast, std::ostream & s, size_t indent, bool hilite, bool one_line, bool need_parens)
{
/// Если есть алиас, то требуются скобки вокруг всего выражения, включая алиас. Потому что запись вида 0 AS x + 0 синтаксически некорректна.
if (need_parens && !ast.alias.empty())
s << '(';
std::string indent_str = one_line ? "" : std::string(4 * indent, ' ');
std::string nl_or_nothing = one_line ? "" : "\n";
s << nl_or_nothing << indent_str << "(" << nl_or_nothing;
formatAST(*ast.children[0], s, indent + 1, hilite, one_line);
s << nl_or_nothing << indent_str << ")";
if (!ast.alias.empty())
{
writeAlias(ast.alias, s, hilite, one_line);
if (need_parens)
s << ')';
}
}
void formatAST(const ASTCreateQuery & ast, std::ostream & s, size_t indent, bool hilite, bool one_line, bool need_parens)
@ -461,17 +486,6 @@ void formatAST(const ASTInsertQuery & ast, std::ostream & s, size_t indent, bo
}
}
static void writeAlias(const String & name, std::ostream & s, bool hilite, bool one_line)
{
s << (hilite ? hilite_keyword : "") << " AS " << (hilite ? hilite_alias : "");
WriteBufferFromOStream wb(s, 32);
writeProbablyBackQuotedString(name, wb);
wb.next();
s << (hilite ? hilite_none : "");
}
void formatAST(const ASTFunction & ast, std::ostream & s, size_t indent, bool hilite, bool one_line, bool need_parens)
{
/// Если есть алиас, то требуются скобки вокруг всего выражения, включая алиас. Потому что запись вида 0 AS x + 0 синтаксически некорректна.
@ -829,21 +843,30 @@ void formatAST(const ASTSet & ast, std::ostream & s, size_t indent, bool hilite,
void formatAST(const ASTJoin & ast, std::ostream & s, size_t indent, bool hilite, bool one_line, bool need_parens)
{
s << (hilite ? hilite_keyword : "")
<< (ast.locality == ASTJoin::Global ? "GLOBAL " : "")
<< (ast.strictness == ASTJoin::Any ? "ANY " : "ALL ")
<< (ast.kind == ASTJoin::Inner ? "INNER "
: (ast.kind == ASTJoin::Left ? "LEFT "
: (ast.kind == ASTJoin::Right ? "RIGHT "
: "FULL OUTER ")))
<< "JOIN "
s << (hilite ? hilite_keyword : "");
if (ast.locality == ASTJoin::Global)
s << "GLOBAL ";
if (ast.kind != ASTJoin::Cross)
s << (ast.strictness == ASTJoin::Any ? "ANY " : "ALL ");
s << (ast.kind == ASTJoin::Inner ? "INNER "
: (ast.kind == ASTJoin::Left ? "LEFT "
: (ast.kind == ASTJoin::Right ? "RIGHT "
: (ast.kind == ASTJoin::Cross ? "CROSS "
: "FULL OUTER "))));
s << "JOIN "
<< (hilite ? hilite_none : "");
formatAST(*ast.table, s, indent, hilite, one_line, need_parens);
s << (hilite ? hilite_keyword : "") << " USING " << (hilite ? hilite_none : "");
formatAST(*ast.using_expr_list, s, indent, hilite, one_line, need_parens);
if (ast.kind != ASTJoin::Cross)
{
s << (hilite ? hilite_keyword : "") << " USING " << (hilite ? hilite_none : "");
formatAST(*ast.using_expr_list, s, indent, hilite, one_line, need_parens);
}
}
void formatAST(const ASTCheckQuery & ast, std::ostream & s, size_t indent, bool hilite, bool one_line, bool need_parens)

View File

@ -1,6 +1,10 @@
#include <DB/Storages/StorageSystemColumns.h>
#include <DB/Storages/MergeTree/MergeTreeData.h>
#include <DB/Storages/StorageMergeTree.h>
#include <DB/Storages/StorageReplicatedMergeTree.h>
#include <DB/Columns/ColumnString.h>
#include <DB/DataTypes/DataTypeString.h>
#include <DB/DataTypes/DataTypesNumberFixed.h>
#include <DB/DataStreams/OneBlockInputStream.h>
#include <DB/Common/VirtualColumnUtils.h>
@ -15,7 +19,8 @@ StorageSystemColumns::StorageSystemColumns(const std::string & name_)
{ "name", new DataTypeString },
{ "type", new DataTypeString },
{ "default_type", new DataTypeString },
{ "default_expression", new DataTypeString }
{ "default_expression", new DataTypeString },
{ "bytes", new DataTypeUInt64 },
}
{
}
@ -103,6 +108,7 @@ BlockInputStreams StorageSystemColumns::read(
ColumnPtr type_column = new ColumnString;
ColumnPtr default_type_column = new ColumnString;
ColumnPtr default_expression_column = new ColumnString;
ColumnPtr bytes_column = new ColumnUInt64;
size_t rows = filtered_database_column->size();
for (size_t i = 0; i < rows; ++i)
@ -112,6 +118,7 @@ BlockInputStreams StorageSystemColumns::read(
NamesAndTypesList columns;
ColumnDefaults column_defaults;
std::unordered_map<String, size_t> column_sizes;
{
StoragePtr storage = storages.at(std::make_pair(database_name, table_name));
@ -120,6 +127,26 @@ BlockInputStreams StorageSystemColumns::read(
columns = storage->getColumnsList();
columns.insert(std::end(columns), std::begin(storage->alias_columns), std::end(storage->alias_columns));
column_defaults = storage->column_defaults;
/** Данные о размерах столбцов для таблиц семейства MergeTree.
* NOTE: В дальнейшем можно сделать интерфейс, позволяющий получить размеры столбцов у IStorage.
*/
if (auto storage_concrete = dynamic_cast<StorageMergeTree *>(storage.get()))
{
column_sizes = storage_concrete->getData().getColumnSizes();
}
else if (auto storage_concrete = dynamic_cast<StorageReplicatedMergeTree *>(storage.get()))
{
column_sizes = storage_concrete->getData().getColumnSizes();
auto unreplicated_data = storage_concrete->getUnreplicatedData();
if (unreplicated_data)
{
auto unreplicated_column_sizes = unreplicated_data->getColumnSizes();
for (const auto & name_size : unreplicated_column_sizes)
column_sizes[name_size.first] += name_size.second;
}
}
}
for (const auto & column : columns)
@ -129,16 +156,26 @@ BlockInputStreams StorageSystemColumns::read(
name_column->insert(column.name);
type_column->insert(column.type->getName());
const auto it = column_defaults.find(column.name);
if (it == std::end(column_defaults))
{
default_type_column->insertDefault();
default_expression_column->insertDefault();
const auto it = column_defaults.find(column.name);
if (it == std::end(column_defaults))
{
default_type_column->insertDefault();
default_expression_column->insertDefault();
}
else
{
default_type_column->insert(toString(it->second.type));
default_expression_column->insert(queryToString(it->second.expression));
}
}
else
{
default_type_column->insert(toString(it->second.type));
default_expression_column->insert(queryToString(it->second.expression));
const auto it = column_sizes.find(column.name);
if (it == std::end(column_sizes))
bytes_column->insertDefault();
else
bytes_column->insert(it->second);
}
}
}
@ -151,6 +188,7 @@ BlockInputStreams StorageSystemColumns::read(
block.insert(ColumnWithTypeAndName(type_column, new DataTypeString, "type"));
block.insert(ColumnWithTypeAndName(default_type_column, new DataTypeString, "default_type"));
block.insert(ColumnWithTypeAndName(default_expression_column, new DataTypeString, "default_expression"));
block.insert(ColumnWithTypeAndName(bytes_column, new DataTypeUInt64, "bytes"));
return BlockInputStreams{ 1, new OneBlockInputStream(block) };
}

View File

@ -0,0 +1 @@
10 10 10 1297 1297 1299 1299

View File

@ -0,0 +1 @@
SELECT uniqExact(x), length(groupUniqArray(x)), arrayUniq(groupArray(x)), uniqExact(y), arrayUniq(groupArray(y)), uniqExact(concat(toString(x), '_', y)), arrayUniq(groupArray(x), groupArray(y)) FROM (SELECT round(log(intHash32(number))) AS x, toString(round(cbrt(intHash32(number)))) AS y FROM system.numbers LIMIT 10000);

View File

@ -0,0 +1,15 @@
0 0
0 1
0 2
0 3
0 4
1 0
1 1
1 2
1 3
1 4
2 0
2 1
2 2
2 3
2 4

View File

@ -0,0 +1 @@
SELECT x, y FROM (SELECT number AS x FROM system.numbers LIMIT 3) CROSS JOIN (SELECT number AS y FROM system.numbers LIMIT 5);

View File

@ -0,0 +1,40 @@
1 Hello []
2 Hello []
3 Hello [0,1,2]
4 [0,1,2,3]
5 [0,1,2,3,4]
1 Hello
2 Hello
3 Hello
4
5
1 []
2 []
3 [0,1,2]
4 [0,1,2,3]
5 [0,1,2,3,4]
Hello []
Hello []
Hello [0,1,2]
[0,1,2,3]
[0,1,2,3,4]
1
2
3
4
5
3 Hello [0,1,2]
4 [0,1,2,3]
5 [0,1,2,3,4]
3 Hello
4
5
3 [0,1,2]
4 [0,1,2,3]
5 [0,1,2,3,4]
Hello [0,1,2]
[0,1,2,3]
[0,1,2,3,4]
3
4
5

View File

@ -0,0 +1,11 @@
SELECT k, x, y FROM (SELECT arrayJoin([1, 2, 3]) AS k, 'Hello' AS x) ANY FULL JOIN (SELECT range(k) AS y, arrayJoin([3, 4, 5]) AS k) USING k WHERE k < 10 ORDER BY k;
SELECT k, x FROM (SELECT arrayJoin([1, 2, 3]) AS k, 'Hello' AS x) ANY FULL JOIN (SELECT range(k) AS y, arrayJoin([3, 4, 5]) AS k) USING k WHERE k < 10 ORDER BY k;
SELECT k, y FROM (SELECT arrayJoin([1, 2, 3]) AS k, 'Hello' AS x) ANY FULL JOIN (SELECT range(k) AS y, arrayJoin([3, 4, 5]) AS k) USING k WHERE k < 10 ORDER BY k;
SELECT x, y FROM (SELECT arrayJoin([1, 2, 3]) AS k, 'Hello' AS x) ANY FULL JOIN (SELECT range(k) AS y, arrayJoin([3, 4, 5]) AS k) USING k WHERE k < 10 ORDER BY k;
SELECT k FROM (SELECT arrayJoin([1, 2, 3]) AS k, 'Hello' AS x) ANY FULL JOIN (SELECT range(k) AS y, arrayJoin([3, 4, 5]) AS k) USING k WHERE k < 10 ORDER BY k;
SELECT k, x, y FROM (SELECT arrayJoin([1, 2, 3]) AS k, 'Hello' AS x) ANY RIGHT JOIN (SELECT range(k) AS y, arrayJoin([3, 4, 5]) AS k) USING k WHERE k < 10 ORDER BY k;
SELECT k, x FROM (SELECT arrayJoin([1, 2, 3]) AS k, 'Hello' AS x) ANY RIGHT JOIN (SELECT range(k) AS y, arrayJoin([3, 4, 5]) AS k) USING k WHERE k < 10 ORDER BY k;
SELECT k, y FROM (SELECT arrayJoin([1, 2, 3]) AS k, 'Hello' AS x) ANY RIGHT JOIN (SELECT range(k) AS y, arrayJoin([3, 4, 5]) AS k) USING k WHERE k < 10 ORDER BY k;
SELECT x, y FROM (SELECT arrayJoin([1, 2, 3]) AS k, 'Hello' AS x) ANY RIGHT JOIN (SELECT range(k) AS y, arrayJoin([3, 4, 5]) AS k) USING k WHERE k < 10 ORDER BY k;
SELECT k FROM (SELECT arrayJoin([1, 2, 3]) AS k, 'Hello' AS x) ANY RIGHT JOIN (SELECT range(k) AS y, arrayJoin([3, 4, 5]) AS k) USING k WHERE k < 10 ORDER BY k;

View File

@ -0,0 +1 @@
SELECT extractURLParameter('http://test.com/?testq=aaa&q=111', 'q');

View File

@ -0,0 +1,5 @@
1 1
1
1 1
('2015-01-02','Hello')
('2015-01-02','Hello') ('2015-01-02','Hello') 1 1

View File

@ -0,0 +1,5 @@
SELECT (SELECT (SELECT (SELECT (SELECT (SELECT count() FROM (SELECT * FROM system.numbers LIMIT 10)))))) = (SELECT 10), ((SELECT 1, 'Hello', [1, 2]).3)[1];
SELECT toUInt64((SELECT 9)) IN (SELECT number FROM system.numbers LIMIT 10);
SELECT (SELECT toDate('2015-01-02')) = toDate('2015-01-02'), 'Hello' = (SELECT 'Hello');
SELECT (SELECT toDate('2015-01-02'), 'Hello');
SELECT (SELECT toDate('2015-01-02'), 'Hello') AS x, x, identity((SELECT 1)), identity((SELECT 1) AS y);

View File

@ -0,0 +1,14 @@
[1,2]
[0]
[4,5,6]
[''] ['0000-00-00'] ['0000-00-00 00:00:00']
[0] [''] ['0000-00-00 00:00:00'] ['0000-00-00']
[0] ['0'] ['2015-01-01 00:00:00'] ['2015-01-01']
[0,1] [''] ['2015-01-01 00:00:00','2015-01-01 00:00:01'] ['2015-01-01','2015-01-02']
[0] ['0'] ['2015-01-01 00:00:00','2015-01-01 00:00:01','2015-01-01 00:00:02'] ['2015-01-01','2015-01-02','2015-01-03']
[0] [''] ['2015-01-01 00:00:00','2015-01-01 00:00:01','2015-01-01 00:00:02','2015-01-01 00:00:03'] ['0000-00-00']
[0,1] ['0'] ['0000-00-00 00:00:00'] ['2015-01-01']
[0] [''] ['2015-01-01 00:00:00'] ['2015-01-01','2015-01-02']
[0] ['0'] ['2015-01-01 00:00:00','2015-01-01 00:00:01'] ['2015-01-01','2015-01-02','2015-01-03']
[0,1] [''] ['2015-01-01 00:00:00','2015-01-01 00:00:01','2015-01-01 00:00:02'] ['0000-00-00']
[0] ['0'] ['2015-01-01 00:00:00','2015-01-01 00:00:01','2015-01-01 00:00:02','2015-01-01 00:00:03'] ['2015-01-01']

View File

@ -0,0 +1,8 @@
SELECT emptyArrayToSingle(arrayFilter(x -> x != 99, arrayJoin([[1, 2], [99], [4, 5, 6]])));
SELECT emptyArrayToSingle(emptyArrayString()), emptyArrayToSingle(emptyArrayDate()), emptyArrayToSingle(emptyArrayDateTime());
SELECT
emptyArrayToSingle(range(number % 3)),
emptyArrayToSingle(arrayMap(x -> toString(x), range(number % 2))),
emptyArrayToSingle(arrayMap(x -> toDateTime('2015-01-01 00:00:00') + x, range(number % 5))),
emptyArrayToSingle(arrayMap(x -> toDate('2015-01-01') + x, range(number % 4))) FROM system.numbers LIMIT 10;

View File

@ -0,0 +1,23 @@
0
1
2
2
3
4
5
5
6
7
0 [] 0
1 [0] 0
2 [0,1] 0
2 [0,1] 1
3 [] 0
4 [0] 0
5 [0,1] 0
5 [0,1] 1
6 [] 0
7 [0] 0
8 [0,1] 0
8 [0,1] 1
9 [] 0

View File

@ -0,0 +1,2 @@
SELECT number FROM system.numbers LEFT ARRAY JOIN range(number % 3) AS arr LIMIT 10;
SELECT number, arr, x FROM (SELECT number, range(number % 3) AS arr FROM system.numbers LIMIT 10) LEFT ARRAY JOIN arr AS x;

View File

@ -0,0 +1,7 @@
0 15 15
1 14 14
2 14 14
3 15 15
4 9 9
5 9 9
6 9 9

View File

@ -0,0 +1 @@
SELECT k % 7 AS k2, finalizeAggregation(uniqMergeState(state)), uniqMerge(state) FROM (SELECT k, uniqState(x) AS state FROM (SELECT number % 11 AS k, intDiv(number, 7) AS x FROM system.numbers LIMIT 100) GROUP BY k) GROUP BY k2 ORDER BY k2;

View File

@ -0,0 +1,11 @@
1
1
0
0
1
1
4 1 1

View File

@ -0,0 +1,11 @@
DROP TABLE IF EXISTS test.test;
CREATE TABLE test.test (x UInt8) ENGINE = Log;
INSERT INTO test.test SELECT 1 AS x;
INSERT INTO test.test SELECT 1 AS x SETTINGS extremes = 1;
INSERT INTO test.test SELECT 1 AS x GROUP BY 1 WITH TOTALS;
INSERT INTO test.test SELECT 1 AS x GROUP BY 1 WITH TOTALS SETTINGS extremes = 1;
SELECT count(), min(x), max(x) FROM test.test;
DROP TABLE test.test;

View File

@ -0,0 +1,6 @@
#!/bin/bash
curl -sS http://localhost:8123/?extremes=1 -d @- <<< "DROP TABLE IF EXISTS test.test"
curl -sS http://localhost:8123/?extremes=1 -d @- <<< "CREATE TABLE test.test (x UInt8) ENGINE = Log"
curl -sS http://localhost:8123/?extremes=1 -d @- <<< "INSERT INTO test.test SELECT 1 AS x"
curl -sS http://localhost:8123/?extremes=1 -d @- <<< "DROP TABLE test.test"