This commit is contained in:
Roman Peshkurov 2015-10-26 10:58:00 +03:00
commit 1aa72b38d0
44 changed files with 3731 additions and 705 deletions

View File

@ -0,0 +1,254 @@
#pragma once
#include <DB/IO/ReadBuffer.h>
#include <DB/IO/WriteBuffer.h>
#include <DB/IO/ReadHelpers.h>
#include <DB/IO/WriteHelpers.h>
#include <DB/Core/Defines.h>
#include <DB/Core/ErrorCodes.h>
namespace DB
{
/** Компактный массив для хранения данных, размер L, в битах, которых составляет
* меньше одного байта. Вместо того, чтобы хранить каждое значение в отдельный
* байт, что приводит к растрате 37.5% пространства для L=5, CompactArray хранит
* смежные L-битные значения в массиве байтов, т.е. фактически CompactArray
* симулирует массив L-битных значений.
*/
template <typename BucketIndex, UInt8 content_width, size_t bucket_count>
class __attribute__ ((packed)) CompactArray final
{
public:
class Reader;
class Locus;
public:
CompactArray() = default;
UInt8 ALWAYS_INLINE operator[](BucketIndex bucket_index) const
{
Locus locus(bucket_index);
if (locus.index_l == locus.index_r)
return locus.read(bitset[locus.index_l]);
else
return locus.read(bitset[locus.index_l], bitset[locus.index_r]);
}
Locus ALWAYS_INLINE operator[](BucketIndex bucket_index)
{
Locus locus(bucket_index);
locus.content_l = &bitset[locus.index_l];
if (locus.index_l == locus.index_r)
locus.content_r = locus.content_l;
else
locus.content_r = &bitset[locus.index_r];
return locus;
}
void readText(ReadBuffer & in)
{
for (size_t i = 0; i < BITSET_SIZE; ++i)
{
if (i != 0)
assertString(",", in);
readIntText(bitset[i], in);
}
}
void writeText(WriteBuffer & out) const
{
for (size_t i = 0; i < BITSET_SIZE; ++i)
{
if (i != 0)
writeCString(",", out);
writeIntText(bitset[i], out);
}
}
private:
/// число байт в битсете
static constexpr size_t BITSET_SIZE = (static_cast<size_t>(bucket_count) * content_width + 7) / 8;
UInt8 bitset[BITSET_SIZE] = { 0 };
};
/** Класс для последовательного чтения ячеек из компактного массива на диске.
*/
template <typename BucketIndex, UInt8 content_width, size_t bucket_count>
class CompactArray<BucketIndex, content_width, bucket_count>::Reader final
{
public:
Reader(ReadBuffer & in_)
: in(in_)
{
}
Reader(const Reader &) = delete;
Reader & operator=(const Reader &) = delete;
bool next()
{
if (current_bucket_index == bucket_count)
{
is_eof = true;
return false;
}
locus.init(current_bucket_index);
if (current_bucket_index == 0)
{
in.readStrict(reinterpret_cast<char *>(&value_l), 1);
++read_count;
}
else
value_l = value_r;
if (locus.index_l != locus.index_r)
{
if (read_count == BITSET_SIZE)
fits_in_byte = true;
else
{
fits_in_byte = false;
in.readStrict(reinterpret_cast<char *>(&value_r), 1);
++read_count;
}
}
else
{
fits_in_byte = true;
value_r = value_l;
}
++current_bucket_index;
return true;
}
/** Вернуть текущий номер ячейки и соответствующее содержание.
*/
inline std::pair<BucketIndex, UInt8> get() const
{
if ((current_bucket_index == 0) || is_eof)
throw Exception("No available data.", ErrorCodes::NO_AVAILABLE_DATA);
if (fits_in_byte)
return std::make_pair(current_bucket_index - 1, locus.read(value_l));
else
return std::make_pair(current_bucket_index - 1, locus.read(value_l, value_r));
}
private:
ReadBuffer & in;
/// Физическое расположение текущей ячейки.
Locus locus;
/// Текущая позиция в файле в виде номера ячейки.
BucketIndex current_bucket_index = 0;
/// Количество прочитанных байтов.
size_t read_count = 0;
/// Содержание в текущей позиции.
UInt8 value_l;
UInt8 value_r;
///
bool is_eof = false;
/// Влезает ли ячейка полностью в один байт?
bool fits_in_byte;
};
/** Структура Locus содержит необходимую информацию, чтобы найти для каждой ячейки
* соответствующие байт и смещение, в битах, от начала ячейки. Поскольку в общем
* случае размер одного байта не делится на размер одной ячейки, возможны случаи,
* когда одна ячейка перекрывает два байта. Поэтому структура Locus содержит две
* пары (индекс, смещение).
*/
template <typename BucketIndex, UInt8 content_width, size_t bucket_count>
class CompactArray<BucketIndex, content_width, bucket_count>::Locus final
{
friend class CompactArray;
friend class CompactArray::Reader;
public:
ALWAYS_INLINE operator UInt8() const
{
if (content_l == content_r)
return read(*content_l);
else
return read(*content_l, *content_r);
}
Locus ALWAYS_INLINE & operator=(UInt8 content)
{
if ((index_l == index_r) || (index_l == (BITSET_SIZE - 1)))
{
/// Ячейка полностью влезает в один байт.
*content_l &= ~(((1 << content_width) - 1) << offset_l);
*content_l |= content << offset_l;
}
else
{
/// Ячейка перекрывает два байта.
size_t left = 8 - offset_l;
*content_l &= ~(((1 << left) - 1) << offset_l);
*content_l |= (content & ((1 << left) - 1)) << offset_l;
*content_r &= ~((1 << offset_r) - 1);
*content_r |= content >> left;
}
return *this;
}
private:
Locus() = default;
Locus(BucketIndex bucket_index)
{
init(bucket_index);
}
void ALWAYS_INLINE init(BucketIndex bucket_index)
{
size_t l = static_cast<size_t>(bucket_index) * content_width;
index_l = l >> 3;
offset_l = l & 7;
size_t r = static_cast<size_t>(bucket_index + 1) * content_width;
index_r = r >> 3;
offset_r = r & 7;
}
UInt8 ALWAYS_INLINE read(UInt8 value_l) const
{
/// Ячейка полностью влезает в один байт.
return (value_l >> offset_l) & ((1 << content_width) - 1);
}
UInt8 ALWAYS_INLINE read(UInt8 value_l, UInt8 value_r) const
{
/// Ячейка перекрывает два байта.
return ((value_l >> offset_l) & ((1 << (8 - offset_l)) - 1))
| ((value_r & ((1 << offset_r) - 1)) << (8 - offset_l));
}
private:
size_t index_l;
size_t offset_l;
size_t index_r;
size_t offset_r;
UInt8 * content_l;
UInt8 * content_r;
/// Проверки
static_assert((content_width > 0) && (content_width < 8), "Invalid parameter value");
static_assert(bucket_count <= (std::numeric_limits<size_t>::max() / content_width), "Invalid parameter value");
};
}

View File

@ -3,6 +3,7 @@
#include <common/Common.h>
#include <stats/IntHash.h>
#include <DB/Common/HyperLogLogBiasEstimator.h>
#include <DB/Common/CompactArray.h>
#include <DB/IO/ReadBuffer.h>
#include <DB/IO/WriteBuffer.h>
@ -61,154 +62,6 @@ template<UInt64 MaxValue> struct MinCounterType
>::Type Type;
};
/** Компактный массив для хранения данных, размер L, в битах, которых составляет меньше одного байта.
* Вместо того, чтобы хранить каждое значение в 8-битную ячейку памяти, что приводит к растрате
* 37.5% пространства для L=5, CompactArray хранит смежные L-битные значения, именно компактные
* ячейки в массиве байтов, т.е. фактически CompactArray симулирует массив L-битных значений.
*/
template<typename BucketIndex, UInt8 content_width, size_t bucket_count>
class __attribute__ ((packed)) CompactArray final
{
public:
class Locus;
public:
CompactArray() = default;
UInt8 ALWAYS_INLINE operator[](BucketIndex bucket_index) const
{
Locus locus(bucket_index);
if (locus.index_l == locus.index_r)
return locus.read(bitset[locus.index_l]);
else
return locus.read(bitset[locus.index_l], bitset[locus.index_r]);
}
Locus ALWAYS_INLINE operator[](BucketIndex bucket_index)
{
Locus locus(bucket_index);
locus.content_l = &bitset[locus.index_l];
if (locus.index_l == locus.index_r)
locus.content_r = locus.content_l;
else
locus.content_r = &bitset[locus.index_r];
return locus;
}
void readText(DB::ReadBuffer & in)
{
for (size_t i = 0; i < BITSET_SIZE; ++i)
{
if (i != 0)
DB::assertString(",", in);
DB::readIntText(bitset[i], in);
}
}
void writeText(DB::WriteBuffer & out) const
{
for (size_t i = 0; i < BITSET_SIZE; ++i)
{
if (i != 0)
writeCString(",", out);
DB::writeIntText(bitset[i], out);
}
}
private:
/// число байт в битсете
static constexpr size_t BITSET_SIZE = (static_cast<size_t>(bucket_count) * content_width + 7) / 8;
UInt8 bitset[BITSET_SIZE] = { 0 };
};
/** Структура Locus содержит необходимую информацию, чтобы найти для каждой компактной ячейки
* соответствующие физическую ячейку и смещение, в битах, от начала ячейки. Поскольку в общем
* случае размер одной физической ячейки не делится на размер одной компактной ячейки, возможны
* случаи, когда одна компактная ячейка перекрывает две физические ячейки. Поэтому структура
* Locus содержит две пары (индекс, смещение).
*/
template<typename BucketIndex, UInt8 content_width, size_t bucket_count>
class CompactArray<BucketIndex, content_width, bucket_count>::Locus final
{
friend class CompactArray;
public:
ALWAYS_INLINE operator UInt8() const
{
if (content_l == content_r)
return read(*content_l);
else
return read(*content_l, *content_r);
}
Locus ALWAYS_INLINE & operator=(UInt8 content)
{
if ((index_l == index_r) || (index_l == (BITSET_SIZE - 1)))
{
/// Компактная ячейка полностью влезает в одну физическую ячейку.
*content_l &= ~(((1 << content_width) - 1) << offset_l);
*content_l |= content << offset_l;
}
else
{
/// Компактная ячейка перекрывает две физические ячейки.
size_t left = 8 - offset_l;
*content_l &= ~(((1 << left) - 1) << offset_l);
*content_l |= (content & ((1 << left) - 1)) << offset_l;
*content_r &= ~((1 << offset_r) - 1);
*content_r |= content >> left;
}
return *this;
}
private:
Locus() = default;
Locus(BucketIndex bucket_index)
{
size_t l = static_cast<size_t>(bucket_index) * content_width;
index_l = l >> 3;
offset_l = l & 7;
size_t r = static_cast<size_t>(bucket_index + 1) * content_width;
index_r = r >> 3;
offset_r = r & 7;
}
UInt8 ALWAYS_INLINE read(UInt8 value_l) const
{
/// Компактная ячейка полностью влезает в одну физическую ячейку.
return (value_l >> offset_l) & ((1 << content_width) - 1);
}
UInt8 ALWAYS_INLINE read(UInt8 value_l, UInt8 value_r) const
{
/// Компактная ячейка перекрывает две физические ячейки.
return ((value_l >> offset_l) & ((1 << (8 - offset_l)) - 1))
| ((value_r & ((1 << offset_r) - 1)) << (8 - offset_l));
}
private:
size_t index_l;
size_t offset_l;
size_t index_r;
size_t offset_r;
UInt8 * content_l;
UInt8 * content_r;
/// Проверки
static_assert((content_width > 0) && (content_width < 8), "Invalid parameter value");
static_assert(bucket_count <= (std::numeric_limits<size_t>::max() / content_width), "Invalid parameter value");
};
/** Знаменатель формулы алгоритма HyperLogLog
*/
template<UInt8 precision, int max_rank, typename HashValueType, typename DenominatorType,
@ -422,7 +275,7 @@ private:
private:
using Value_t = UInt64;
using RankStore = details::CompactArray<HashValueType, rank_width, bucket_count>;
using RankStore = DB::CompactArray<HashValueType, rank_width, bucket_count>;
public:
void insert(Value_t value)
@ -476,12 +329,11 @@ public:
void readAndMerge(DB::ReadBuffer & in)
{
RankStore other;
in.readStrict(reinterpret_cast<char *>(&other), sizeof(RankStore));
for (HashValueType bucket = 0; bucket < bucket_count; ++bucket)
typename RankStore::Reader reader(in);
while (reader.next())
{
UInt8 rank = other[bucket];
update(bucket, rank);
const auto & data = reader.get();
update(data.first, data.second);
}
in.ignore(sizeof(DenominatorCalculatorType) + sizeof(ZerosCounterType));

View File

@ -297,6 +297,8 @@ namespace ErrorCodes
MONGODB_INIT_FAILED = 293,
INVALID_BLOCK_EXTRA_INFO = 294,
RECEIVED_EMPTY_DATA = 295,
NO_REMOTE_SHARD_FOUND = 296,
SHARD_HAS_NO_CONNECTIONS = 297,
KEEPER_EXCEPTION = 999,
POCO_EXCEPTION = 1000,

View File

@ -97,43 +97,195 @@ struct ConvertImpl<DataTypeDate, DataTypeDateTime, Name>
}
};
/// Реализация функции toDate.
/** Преобразование даты-с-временем в дату: отбрасывание времени.
*/
template <typename Name>
struct ConvertImpl<DataTypeDateTime, DataTypeDate, Name>
namespace details { namespace {
template<typename FromType, typename ToType, template <typename, typename> class Transformation>
class Transformer
{
typedef DataTypeDateTime::FieldType FromFieldType;
typedef DataTypeDate::FieldType ToFieldType;
private:
using Op = Transformation<FromType, ToType>;
static void execute(Block & block, const ColumnNumbers & arguments, size_t result)
public:
static void vector_vector(const PODArray<FromType> & vec_from, const ColumnString::Chars_t & data,
const ColumnString::Offsets_t & offsets, PODArray<ToType> & vec_to)
{
const auto & date_lut = DateLUT::instance();
ColumnString::Offset_t prev_offset = 0;
if (const ColumnVector<FromFieldType> * col_from = typeid_cast<const ColumnVector<FromFieldType> *>(&*block.getByPosition(arguments[0]).column))
for (size_t i = 0; i < vec_from.size(); ++i)
{
ColumnVector<ToFieldType> * col_to = new ColumnVector<ToFieldType>;
block.getByPosition(result).column = col_to;
const typename ColumnVector<FromFieldType>::Container_t & vec_from = col_from->getData();
typename ColumnVector<ToFieldType>::Container_t & vec_to = col_to->getData();
size_t size = vec_from.size();
vec_to.resize(size);
for (size_t i = 0; i < size; ++i)
vec_to[i] = date_lut.toDayNum(vec_from[i]);
ColumnString::Offset_t cur_offset = offsets[i];
const std::string time_zone(reinterpret_cast<const char *>(&data[prev_offset]), cur_offset - prev_offset - 1);
const auto & remote_date_lut = DateLUT::instance(time_zone);
vec_to[i] = Op::execute(vec_from[i], remote_date_lut);
prev_offset = cur_offset;
}
else if (const ColumnConst<FromFieldType> * col_from = typeid_cast<const ColumnConst<FromFieldType> *>(&*block.getByPosition(arguments[0]).column))
}
static void vector_constant(const PODArray<FromType> & vec_from, const std::string & data,
PODArray<ToType> & vec_to)
{
const auto & remote_date_lut = DateLUT::instance(data);
for (size_t i = 0; i < vec_from.size(); ++i)
vec_to[i] = Op::execute(vec_from[i], remote_date_lut);
}
static void vector_constant(const PODArray<FromType> & vec_from, PODArray<ToType> & vec_to)
{
const auto & local_date_lut = DateLUT::instance();
for (size_t i = 0; i < vec_from.size(); ++i)
vec_to[i] = Op::execute(vec_from[i], local_date_lut);
}
static void constant_vector(const FromType & from, const ColumnString::Chars_t & data,
const ColumnString::Offsets_t & offsets, PODArray<ToType> & vec_to)
{
ColumnString::Offset_t prev_offset = 0;
for (size_t i = 0; i < offsets.size(); ++i)
{
block.getByPosition(result).column = new ColumnConst<ToFieldType>(col_from->size(), date_lut.toDayNum(col_from->getData()));
ColumnString::Offset_t cur_offset = offsets[i];
const std::string time_zone(reinterpret_cast<const char *>(&data[prev_offset]), cur_offset - prev_offset - 1);
const auto & remote_date_lut = DateLUT::instance(time_zone);
vec_to[i] = Op::execute(from, remote_date_lut);
prev_offset = cur_offset;
}
else
throw Exception("Illegal column " + block.getByPosition(arguments[0]).column->getName()
+ " of first argument of function " + Name::name,
ErrorCodes::ILLEGAL_COLUMN);
}
static void constant_constant(const FromType & from, const std::string & data, ToType & to)
{
const auto & remote_date_lut = DateLUT::instance(data);
to = Op::execute(from, remote_date_lut);
}
static void constant_constant(const FromType & from, ToType & to)
{
const auto & local_date_lut = DateLUT::instance();
to = Op::execute(from, local_date_lut);
}
};
template <typename FromType, template <typename, typename> class Transformation, typename Name>
class ToDateConverter
{
private:
using FromFieldType = typename FromType::FieldType;
using ToFieldType = typename DataTypeDate::FieldType;
using Op = Transformer<FromFieldType, ToFieldType, Transformation>;
public:
static void execute(Block & block, const ColumnNumbers & arguments, size_t result)
{
const ColumnPtr source_col = block.getByPosition(arguments[0]).column;
const auto * sources = typeid_cast<const ColumnVector<FromFieldType> *>(&*source_col);
const auto * const_source = typeid_cast<const ColumnConst<FromFieldType> *>(&*source_col);
if (arguments.size() == 1)
{
if (sources)
{
auto * col_to = new ColumnVector<ToFieldType>;
block.getByPosition(result).column = col_to;
const auto & vec_from = sources->getData();
auto & vec_to = col_to->getData();
size_t size = vec_from.size();
vec_to.resize(size);
Op::vector_constant(vec_from, vec_to);
}
else if (const_source)
{
ToFieldType res;
Op::constant_constant(const_source->getData(), res);
block.getByPosition(result).column = new ColumnConst<ToFieldType>(const_source->size(), res);
}
else
throw Exception("Illegal column " + block.getByPosition(arguments[0]).column->getName()
+ " of argument of function " + Name::name,
ErrorCodes::ILLEGAL_COLUMN);
}
else if (arguments.size() == 2)
{
const ColumnPtr time_zone_col = block.getByPosition(arguments[1]).column;
const auto * time_zones = typeid_cast<const ColumnString *>(&*time_zone_col);
const auto * const_time_zone = typeid_cast<const ColumnConstString *>(&*time_zone_col);
if (sources)
{
auto * col_to = new ColumnVector<ToFieldType>;
block.getByPosition(result).column = col_to;
auto & vec_from = sources->getData();
auto & vec_to = col_to->getData();
vec_to.resize(vec_from.size());
if (time_zones)
Op::vector_vector(vec_from, time_zones->getChars(), time_zones->getOffsets(), vec_to);
else if (const_time_zone)
Op::vector_constant(vec_from, const_time_zone->getData(), vec_to);
else
throw Exception("Illegal column " + block.getByPosition(arguments[1]).column->getName()
+ " of second argument of function " + Name::name,
ErrorCodes::ILLEGAL_COLUMN);
}
else if (const_source)
{
if (time_zones)
{
auto * col_to = new ColumnVector<ToFieldType>;
block.getByPosition(result).column = col_to;
auto & vec_to = col_to->getData();
vec_to.resize(time_zones->getOffsets().size());
Op::constant_vector(const_source->getData(), time_zones->getChars(), time_zones->getOffsets(), vec_to);
}
else if (const_time_zone)
{
ToFieldType res;
Op::constant_constant(const_source->getData(), const_time_zone->getData(), res);
block.getByPosition(result).column = new ColumnConst<ToFieldType>(const_source->size(), res);
}
else
throw Exception("Illegal column " + block.getByPosition(arguments[1]).column->getName()
+ " of second argument of function " + Name::name,
ErrorCodes::ILLEGAL_COLUMN);
}
else
throw Exception("Illegal column " + block.getByPosition(arguments[0]).column->getName()
+ " of first argument of function " + Name::name,
ErrorCodes::ILLEGAL_COLUMN);
}
else
throw Exception("FunctionsConversion: Internal error", ErrorCodes::LOGICAL_ERROR);
}
};
template <typename FromType, typename ToType>
struct ToDateTransform
{
static inline ToType execute(const FromType & from, const DateLUTImpl & date_lut)
{
return date_lut.toDayNum(from);
}
};
template <typename FromType, typename ToType>
struct ToDateTransform32Or64
{
static inline ToType execute(const FromType & from, const DateLUTImpl & date_lut)
{
return (from < 0xFFFF) ? from : date_lut.toDayNum(from);
}
};
}}
/** Преобразование даты-с-временем в дату: отбрасывание времени.
*/
template <typename Name> struct ConvertImpl<DataTypeDateTime, DataTypeDate, Name> : details::ToDateConverter<DataTypeDateTime, details::ToDateTransform, Name> {};
/** Отдельный случай для преобразования (U)Int32 или (U)Int64 в Date.
* Если число меньше 65536, то оно понимается, как DayNum, а если больше или равно - как unix timestamp.
@ -142,56 +294,10 @@ struct ConvertImpl<DataTypeDateTime, DataTypeDate, Name>
* когда пользователь пишет toDate(UInt32), ожидая, что это - перевод unix timestamp в дату
* (иначе такое использование было бы распространённой ошибкой).
*/
template <typename FromDataType, typename Name>
struct ConvertImpl32Or64ToDate
{
typedef typename FromDataType::FieldType FromFieldType;
typedef DataTypeDate::FieldType ToFieldType;
template <typename To, typename From>
static To convert(const From & from, const DateLUTImpl & date_lut)
{
return from < 0xFFFF
? from
: date_lut.toDayNum(from);
}
static void execute(Block & block, const ColumnNumbers & arguments, size_t result)
{
const auto & date_lut = DateLUT::instance();
if (const ColumnVector<FromFieldType> * col_from
= typeid_cast<const ColumnVector<FromFieldType> *>(&*block.getByPosition(arguments[0]).column))
{
ColumnVector<ToFieldType> * col_to = new ColumnVector<ToFieldType>;
block.getByPosition(result).column = col_to;
const typename ColumnVector<FromFieldType>::Container_t & vec_from = col_from->getData();
typename ColumnVector<ToFieldType>::Container_t & vec_to = col_to->getData();
size_t size = vec_from.size();
vec_to.resize(size);
for (size_t i = 0; i < size; ++i)
vec_to[i] = convert<ToFieldType>(vec_from[i], date_lut);
}
else if (const ColumnConst<FromFieldType> * col_from
= typeid_cast<const ColumnConst<FromFieldType> *>(&*block.getByPosition(arguments[0]).column))
{
block.getByPosition(result).column = new ColumnConst<ToFieldType>(col_from->size(),
convert<ToFieldType>(col_from->getData(), date_lut));
}
else
throw Exception("Illegal column " + block.getByPosition(arguments[0]).column->getName()
+ " of first argument of function " + Name::name,
ErrorCodes::ILLEGAL_COLUMN);
}
};
template <typename Name> struct ConvertImpl<DataTypeUInt32, DataTypeDate, Name> : ConvertImpl32Or64ToDate<DataTypeUInt32, Name> {};
template <typename Name> struct ConvertImpl<DataTypeUInt64, DataTypeDate, Name> : ConvertImpl32Or64ToDate<DataTypeUInt64, Name> {};
template <typename Name> struct ConvertImpl<DataTypeInt32, DataTypeDate, Name> : ConvertImpl32Or64ToDate<DataTypeInt32, Name> {};
template <typename Name> struct ConvertImpl<DataTypeInt64, DataTypeDate, Name> : ConvertImpl32Or64ToDate<DataTypeInt64, Name> {};
template <typename Name> struct ConvertImpl<DataTypeUInt32, DataTypeDate, Name> : details::ToDateConverter<DataTypeUInt32, details::ToDateTransform32Or64, Name> {};
template <typename Name> struct ConvertImpl<DataTypeUInt64, DataTypeDate, Name> : details::ToDateConverter<DataTypeUInt64, details::ToDateTransform32Or64, Name> {};
template <typename Name> struct ConvertImpl<DataTypeInt32, DataTypeDate, Name> : details::ToDateConverter<DataTypeInt32, details::ToDateTransform32Or64, Name> {};
template <typename Name> struct ConvertImpl<DataTypeInt64, DataTypeDate, Name> : details::ToDateConverter<DataTypeInt64, details::ToDateTransform32Or64, Name> {};
/** Преобразование чисел, дат, дат-с-временем в строки: через форматирование.
*/
@ -887,6 +993,8 @@ struct ConvertImpl<DataTypeFixedString, DataTypeString, Name>
}
};
/// Предварительное объявление.
struct NameToDate { static constexpr auto name = "toDate"; };
template <typename ToDataType, typename Name>
class FunctionConvert : public IFunction
@ -934,7 +1042,9 @@ public:
private:
template<typename ToDataType2 = ToDataType, typename Name2 = Name>
DataTypePtr getReturnTypeImpl(const DataTypes & arguments,
typename std::enable_if<!(std::is_same<ToDataType2, DataTypeString>::value || std::is_same<Name2, NameToUnixTimestamp>::value), void>::type * = nullptr) const
typename std::enable_if<!(std::is_same<ToDataType2, DataTypeString>::value ||
std::is_same<Name2, NameToUnixTimestamp>::value ||
std::is_same<Name2, NameToDate>::value)>::type * = nullptr) const
{
if (arguments.size() != 1)
throw Exception("Number of arguments for function " + getName() + " doesn't match: passed "
@ -960,7 +1070,7 @@ private:
+ toString(arguments.size()) + ", should be 1.",
ErrorCodes::NUMBER_OF_ARGUMENTS_DOESNT_MATCH);
}
else if ((arguments.size()) == 2 && typeid_cast<const DataTypeString *>(&*arguments[1]) == nullptr)
else if ((arguments.size() == 2) && (typeid_cast<const DataTypeString *>(&*arguments[1]) == nullptr))
{
throw Exception{
"Illegal type " + arguments[1]->getName() + " of argument of function " + getName(),
@ -987,7 +1097,7 @@ private:
+ toString(arguments.size()) + ", should be 1.",
ErrorCodes::NUMBER_OF_ARGUMENTS_DOESNT_MATCH);
}
else if ((arguments.size()) == 2 && typeid_cast<const DataTypeString *>(&*arguments[1]) == nullptr)
else if ((arguments.size() == 2) && (typeid_cast<const DataTypeString *>(&*arguments[1]) == nullptr))
{
throw Exception{
"Illegal type " + arguments[1]->getName() + " of argument of function " + getName(),
@ -997,6 +1107,26 @@ private:
return new ToDataType2;
}
template<typename ToDataType2 = ToDataType, typename Name2 = Name>
DataTypePtr getReturnTypeImpl(const DataTypes & arguments,
typename std::enable_if<std::is_same<Name2, NameToDate>::value>::type * = nullptr) const
{
if ((arguments.size() < 1) || (arguments.size() > 2))
throw Exception("Number of arguments for function " + getName() + " doesn't match: passed "
+ toString(arguments.size()) + ", should be 1 or 2.",
ErrorCodes::NUMBER_OF_ARGUMENTS_DOESNT_MATCH);
if ((arguments.size() == 2) && (typeid_cast<const DataTypeString *>(&*arguments[1]) == nullptr))
{
throw Exception{
"Illegal type " + arguments[1]->getName() + " of 2nd argument of function " + getName(),
ErrorCodes::ILLEGAL_TYPE_OF_ARGUMENT
};
}
return new ToDataType2;
}
};
@ -1140,7 +1270,6 @@ struct NameToInt32 { static constexpr auto name = "toInt32"; };
struct NameToInt64 { static constexpr auto name = "toInt64"; };
struct NameToFloat32 { static constexpr auto name = "toFloat32"; };
struct NameToFloat64 { static constexpr auto name = "toFloat64"; };
struct NameToDate { static constexpr auto name = "toDate"; };
struct NameToDateTime { static constexpr auto name = "toDateTime"; };
struct NameToString { static constexpr auto name = "toString"; };

View File

@ -402,49 +402,8 @@ public:
return name;
}
DataTypePtr getReturnType(const DataTypes & arguments) const override
{
return getReturnTypeImpl(arguments);
}
/// Выполнить функцию над блоком.
void execute(Block & block, const ColumnNumbers & arguments, size_t result) override
{
IDataType * from_type = &*block.getByPosition(arguments[0]).type;
if (typeid_cast<const DataTypeDate *>(from_type))
DateTimeTransformImpl<DataTypeDate::FieldType, typename ToDataType::FieldType, Transform, Name>::execute(block, arguments, result);
else if (typeid_cast<const DataTypeDateTime * >(from_type))
DateTimeTransformImpl<DataTypeDateTime::FieldType, typename ToDataType::FieldType, Transform, Name>::execute(block, arguments, result);
else
throw Exception("Illegal type " + block.getByPosition(arguments[0]).type->getName() + " of argument of function " + getName(),
ErrorCodes::ILLEGAL_TYPE_OF_ARGUMENT);
}
private:
/// Получить тип результата по типам аргументов. Если функция неприменима для данных аргументов - кинуть исключение.
template<typename ToDataType2 = ToDataType, typename Transform2 = Transform>
DataTypePtr getReturnTypeImpl(const DataTypes & arguments,
typename std::enable_if<
!(std::is_same<ToDataType2, DataTypeDate>::value
|| (std::is_same<ToDataType2, DataTypeDateTime>::value && std::is_same<Transform2, ToTimeImpl>::value))
, void>::type * = nullptr) const
{
if (arguments.size() != 1)
throw Exception("Number of arguments for function " + getName() + " doesn't match: passed "
+ toString(arguments.size()) + ", should be 1.",
ErrorCodes::NUMBER_OF_ARGUMENTS_DOESNT_MATCH);
return new ToDataType;
}
template<typename ToDataType2 = ToDataType, typename Transform2 = Transform>
DataTypePtr getReturnTypeImpl(const DataTypes & arguments,
typename std::enable_if<
std::is_same<ToDataType2, DataTypeDate>::value
|| (std::is_same<ToDataType2, DataTypeDateTime>::value && std::is_same<Transform2, ToTimeImpl>::value)
, void>::type * = nullptr) const
DataTypePtr getReturnType(const DataTypes & arguments) const override
{
if ((arguments.size() < 1) || (arguments.size() > 2))
throw Exception("Number of arguments for function " + getName() + " doesn't match: passed "
@ -468,6 +427,20 @@ private:
return new ToDataType;
}
/// Выполнить функцию над блоком.
void execute(Block & block, const ColumnNumbers & arguments, size_t result) override
{
IDataType * from_type = &*block.getByPosition(arguments[0]).type;
if (typeid_cast<const DataTypeDate *>(from_type))
DateTimeTransformImpl<DataTypeDate::FieldType, typename ToDataType::FieldType, Transform, Name>::execute(block, arguments, result);
else if (typeid_cast<const DataTypeDateTime * >(from_type))
DateTimeTransformImpl<DataTypeDateTime::FieldType, typename ToDataType::FieldType, Transform, Name>::execute(block, arguments, result);
else
throw Exception("Illegal type " + block.getByPosition(arguments[0]).type->getName() + " of argument of function " + getName(),
ErrorCodes::ILLEGAL_TYPE_OF_ARGUMENT);
}
};

View File

@ -427,7 +427,7 @@ void readDateTimeTextFallback(time_t & datetime, ReadBuffer & buf);
inline void readDateTimeText(time_t & datetime, ReadBuffer & buf)
{
/** Считываем 10 символов, которые могут быть unix timestamp.
* При этом, поддерживается только unix timestamp из 10 символов - от 9 сентября 2001.
* При этом, поддерживается только unix timestamp из 5-10 символов.
* Потом смотрим на пятый символ. Если это число - парсим unix timestamp.
* Если это не число - парсим YYYY-MM-DD hh:mm:ss.
*/

View File

@ -968,6 +968,25 @@ protected:
Table & table_dst,
Table & table_src) const;
/// Слить данные из хэш-таблицы src в dst, но только для ключей, которые уже есть в dst. В остальных случаях, слить данные в overflows.
template <typename Method, typename Table>
void mergeDataNoMoreKeysImpl(
Table & table_dst,
AggregatedDataWithoutKey & overflows,
Table & table_src) const;
/// То же самое, но игнорирует остальные ключи.
template <typename Method, typename Table>
void mergeDataOnlyExistingKeysImpl(
Table & table_dst,
Table & table_src) const;
/// Слить все ключи, оставшиеся после предыдущего метода, в overflows.
template <typename Method, typename Table>
void mergeDataRemainingKeysToOverflowsImpl(
AggregatedDataWithoutKey & overflows,
Table & table_src) const;
void mergeWithoutKeyDataImpl(
ManyAggregatedDataVariants & non_empty_data) const;
@ -1024,13 +1043,24 @@ protected:
bool final,
boost::threadpool::pool * thread_pool) const;
template <bool no_more_keys, typename Method, typename Table>
void mergeStreamsImplCase(
Block & block,
const Sizes & key_sizes,
Arena * aggregates_pool,
Method & method,
Table & data,
AggregateDataPtr overflow_row) const;
template <typename Method, typename Table>
void mergeStreamsImpl(
Block & block,
const Sizes & key_sizes,
Arena * aggregates_pool,
Method & method,
Table & data) const;
Table & data,
AggregateDataPtr overflow_row,
bool no_more_keys) const;
void mergeWithoutKeyStreamsImpl(
Block & block,
@ -1049,6 +1079,15 @@ protected:
template <typename Method>
void destroyImpl(
Method & method) const;
/** Проверяет ограничения на максимальное количество ключей для агрегации.
* Если оно превышено, то, в зависимости от group_by_overflow_mode, либо
* - кидает исключение;
* - возвращает false, что говорит о том, что выполнение нужно прервать;
* - выставляет переменную no_more_keys в true.
*/
bool checkLimits(size_t result_size, bool & no_more_keys) const;
};

View File

@ -13,7 +13,7 @@ namespace DB
/// С локальными узлами соединение не устанавливается, а выполяется запрос напрямую.
/// Поэтому храним только количество локальных узлов
/// В конфиге кластер включает в себя узлы <node> или <shard>
class Cluster : private boost::noncopyable
class Cluster
{
public:
Cluster(const Settings & settings, const String & cluster_name);
@ -22,28 +22,13 @@ public:
Cluster(const Settings & settings, std::vector<std::vector<String>> names,
const String & username, const String & password);
/// количество узлов clickhouse сервера, расположенных локально
/// к локальным узлам обращаемся напрямую
size_t getLocalNodesNum() const { return local_nodes_num; }
Cluster(const Cluster &) = delete;
Cluster & operator=(const Cluster &) = delete;
/// используеться для выставления ограничения на размер таймаута
static Poco::Timespan saturate(const Poco::Timespan & v, const Poco::Timespan & limit);
public:
/// Соединения с удалёнными серверами.
ConnectionPools pools;
struct ShardInfo
{
/// contains names of directories for asynchronous write to StorageDistributed
std::vector<std::string> dir_names;
UInt32 shard_num;
int weight;
size_t num_local_nodes;
};
std::vector<ShardInfo> shard_info_vec;
std::vector<size_t> slot_to_shard;
struct Address
{
/** В конфиге адреса либо находятся в узлах <node>:
@ -73,26 +58,59 @@ public:
Address(const String & host_port_, const String & user_, const String & password_);
};
private:
static bool isLocal(const Address & address);
using Addresses = std::vector<Address>;
using AddressesWithFailover = std::vector<Addresses>;
struct ShardInfo
{
public:
bool isLocal() const { return !local_addresses.empty(); }
bool hasRemoteConnections() const { return !pool.isNull(); }
size_t getLocalNodeCount() const { return local_addresses.size(); }
public:
/// contains names of directories for asynchronous write to StorageDistributed
std::vector<std::string> dir_names;
UInt32 shard_num;
int weight;
Addresses local_addresses;
mutable ConnectionPoolPtr pool;
};
using ShardsInfo = std::vector<ShardInfo>;
public:
/// Массив шардов. Каждый шард - адреса одного сервера.
typedef std::vector<Address> Addresses;
const ShardsInfo & getShardsInfo() const { return shards_info; }
const Addresses & getShardsAddresses() const { return addresses; }
const AddressesWithFailover & getShardsWithFailoverAddresses() const { return addresses_with_failover; }
/// Массив шардов. Для каждого шарда - массив адресов реплик (серверов, считающихся идентичными).
typedef std::vector<Addresses> AddressesWithFailover;
const ShardInfo * getAnyRemoteShardInfo() const { return any_remote_shard_info; }
const Addresses & getShardsInfo() const { return addresses; }
const AddressesWithFailover & getShardsWithFailoverInfo() const { return addresses_with_failover; }
const Addresses & getLocalShardsInfo() const { return local_addresses; }
/// Количество удалённых шардов.
size_t getRemoteShardCount() const { return remote_shard_count; }
/// Количество узлов clickhouse сервера, расположенных локально
/// к локальным узлам обращаемся напрямую.
size_t getLocalShardCount() const { return local_shard_count; }
public:
std::vector<size_t> slot_to_shard;
private:
Addresses addresses;
AddressesWithFailover addresses_with_failover;
Addresses local_addresses;
void initMisc();
size_t local_nodes_num = 0;
private:
/// Описание шардов кластера.
ShardsInfo shards_info;
/// Любой удалённый шард.
ShardInfo * any_remote_shard_info = nullptr;
/// Массив шардов. Каждый шард - адреса одного сервера.
Addresses addresses;
/// Массив шардов. Для каждого шарда - массив адресов реплик (серверов, считающихся идентичными).
AddressesWithFailover addresses_with_failover;
size_t remote_shard_count = 0;
size_t local_shard_count = 0;
};
struct Clusters

View File

@ -182,6 +182,9 @@ private:
/// Например, для ARRAY JOIN [1,2] AS b сюда попадет "b" -> "array(1,2)".
NameToNameMap array_join_alias_to_name;
/// Обратное отображение для array_join_alias_to_name.
NameToNameMap array_join_name_to_alias;
/// Нужно ли подготавливать к выполнению глобальные подзапросы при анализировании запроса.
bool do_global;

View File

@ -1,13 +1,13 @@
#pragma once
#include <DB/Parsers/IParserBase.h>
#include <DB/Parsers/ParserQueryWithOutput.h>
namespace DB
{
/** Запрос вида
* CHECK [TABLE] [database.]table
*/
class ParserCheckQuery : public IParserBase
class ParserCheckQuery : public ParserQueryWithOutput
{
protected:
const char * getName() const { return "ALTER query"; }

View File

@ -0,0 +1,21 @@
#pragma once
#include <DB/Parsers/IParserBase.h>
#include <DB/Parsers/CommonParsers.h>
#include <DB/Parsers/ASTQueryWithOutput.h>
namespace DB
{
/** Парсер для запросов поддерживающих секцию FORMAT.
*/
class ParserQueryWithOutput : public IParserBase
{
protected:
bool parseFormat(ASTQueryWithOutput & query, Pos & pos, Pos end, ASTPtr & node, Pos & max_parsed_pos, Expected & expected);
protected:
ParserWhiteSpaceOrComments ws;
};
}

View File

@ -1,13 +1,13 @@
#pragma once
#include <DB/Parsers/IParserBase.h>
#include <DB/Parsers/ParserQueryWithOutput.h>
namespace DB
{
class ParserSelectQuery : public IParserBase
class ParserSelectQuery : public ParserQueryWithOutput
{
protected:
const char * getName() const { return "SELECT query"; }

View File

@ -1,6 +1,7 @@
#pragma once
#include <DB/Parsers/IParserBase.h>
#include <DB/Parsers/ParserQueryWithOutput.h>
#include <DB/Parsers/CommonParsers.h>
#include <DB/Parsers/ExpressionElementParsers.h>
#include <DB/Parsers/ASTIdentifier.h>
@ -12,7 +13,7 @@ namespace DB
/** Запрос SHOW PROCESSLIST
*/
class ParserShowProcesslistQuery : public IParserBase
class ParserShowProcesslistQuery : public ParserQueryWithOutput
{
protected:
const char * getName() const { return "SHOW PROCESSLIST query"; }
@ -21,12 +22,11 @@ protected:
{
Pos begin = pos;
ParserWhiteSpaceOrComments ws;
ParserString s_show("SHOW", true, true);
ParserString s_processlist("PROCESSLIST", true, true);
ParserString s_format("FORMAT", true, true);
ASTPtr format;
ASTShowProcesslistQuery * query = new ASTShowProcesslistQuery;
ASTPtr query_ptr = query;
ws.ignore(pos, end);
@ -40,22 +40,12 @@ protected:
ws.ignore(pos, end);
if (s_format.ignore(pos, end, max_parsed_pos, expected))
{
ws.ignore(pos, end);
/// FORMAT format_name
if (!parseFormat(*query, pos, end, node, max_parsed_pos, expected))
return false;
ParserIdentifier format_p;
if (!format_p.parse(pos, end, format, max_parsed_pos, expected))
return false;
typeid_cast<ASTIdentifier &>(*format).kind = ASTIdentifier::Format;
ws.ignore(pos, end);
}
ASTShowProcesslistQuery * query = new ASTShowProcesslistQuery(StringRange(begin, pos));
query->format = format;
node = query;
query->range = StringRange(begin, pos);
node = query_ptr;
return true;
}

View File

@ -1,6 +1,6 @@
#pragma once
#include <DB/Parsers/IParserBase.h>
#include <DB/Parsers/ParserQueryWithOutput.h>
namespace DB
@ -11,7 +11,7 @@ namespace DB
* или
* SHOW DATABASES.
*/
class ParserShowTablesQuery : public IParserBase
class ParserShowTablesQuery : public ParserQueryWithOutput
{
protected:
const char * getName() const { return "SHOW TABLES|DATABASES query"; }

View File

@ -1,6 +1,7 @@
#pragma once
#include <DB/Parsers/IParserBase.h>
#include <DB/Parsers/ParserQueryWithOutput.h>
#include <DB/Parsers/ExpressionElementParsers.h>
@ -9,7 +10,7 @@ namespace DB
/** Запрос (EXISTS | SHOW CREATE | (DESCRIBE | DESC) ) [TABLE] [db.]name [FORMAT format]
*/
class ParserTablePropertiesQuery : public IParserBase
class ParserTablePropertiesQuery : public ParserQueryWithOutput
{
protected:
const char * getName() const { return "EXISTS, SHOW CREATE or DESCRIBE query"; }

View File

@ -39,7 +39,7 @@ public:
void write(const Block & block) override
{
if (storage.getShardingKeyExpr() && storage.cluster.shard_info_vec.size() > 1)
if (storage.getShardingKeyExpr() && (storage.cluster.getShardsInfo().size() > 1))
return writeSplit(block);
writeImpl(block);
@ -50,7 +50,7 @@ private:
static std::vector<IColumn::Filter> createFiltersImpl(const size_t num_rows, const IColumn * column, const Cluster & cluster)
{
const auto total_weight = cluster.slot_to_shard.size();
const auto num_shards = cluster.shard_info_vec.size();
const auto num_shards = cluster.getShardsInfo().size();
std::vector<IColumn::Filter> filters(num_shards);
/** Деление отрицательного числа с остатком на положительное, в C++ даёт отрицательный остаток.
@ -123,7 +123,7 @@ private:
auto filters = createFilters(block);
const auto num_shards = storage.cluster.shard_info_vec.size();
const auto num_shards = storage.cluster.getShardsInfo().size();
for (size_t i = 0; i < num_shards; ++i)
{
auto target_block = block.cloneEmpty();
@ -138,9 +138,9 @@ private:
void writeImpl(const Block & block, const size_t shard_id = 0)
{
const auto & shard_info = storage.cluster.shard_info_vec[shard_id];
if (shard_info.num_local_nodes)
writeToLocal(block, shard_info.num_local_nodes);
const auto & shard_info = storage.cluster.getShardsInfo()[shard_id];
if (shard_info.getLocalNodeCount() > 0)
writeToLocal(block, shard_info.getLocalNodeCount());
/// dir_names is empty if shard has only local addresses
if (!shard_info.dir_names.empty())

View File

@ -132,10 +132,15 @@ private:
Settings settings = context.getSettings();
NamesAndTypesList res;
/// Отправляем на первый попавшийся шард
/// Отправляем на первый попавшийся удалённый шард.
const auto shard_info = cluster.getAnyRemoteShardInfo();
if (shard_info == nullptr)
throw Exception("No remote shard found", ErrorCodes::NO_REMOTE_SHARD_FOUND);
ConnectionPoolPtr pool = shard_info->pool;
BlockInputStreamPtr input{
new RemoteBlockInputStream{
cluster.pools.front().get(), query, &settings, nullptr,
pool.get(), query, &settings, nullptr,
Tables(), QueryProcessingStage::Complete, context}
};
input->readPrefix();

View File

@ -0,0 +1,261 @@
#include <DB/Common/CompactArray.h>
#include <DB/IO/WriteBufferFromFile.h>
#include <DB/IO/ReadBufferFromFile.h>
#include <boost/filesystem.hpp>
#include <string>
#include <iostream>
#include <fstream>
#include <stdexcept>
#include <unistd.h>
#include <cstdlib>
namespace fs = boost::filesystem;
std::string createTmpPath(const std::string & filename)
{
char pattern[] = "/tmp/fileXXXXXX";
char * dir = mkdtemp(pattern);
if (dir == nullptr)
throw std::runtime_error("Could not create directory");
return std::string(dir) + "/" + filename;
}
template <size_t width, size_t bucket_count, typename Generator>
struct Test
{
static void perform()
{
bool ok = true;
std::string filename;
try
{
using Store = DB::CompactArray<UInt64, width, bucket_count>;
Store store;
for (size_t i = 0; i < bucket_count; ++i)
store[i] = Generator::execute(i, width);
filename = createTmpPath("compact_array.bin");
{
DB::WriteBufferFromFile wb(filename);
wb.write(reinterpret_cast<const char *>(&store), sizeof(store));
const unsigned char * p = reinterpret_cast<const unsigned char *>(&store);
for (size_t i = 0; i < sizeof(store); ++i)
++p;
}
{
DB::ReadBufferFromFile rb(filename);
typename Store::Reader reader(rb);
while (reader.next())
{
const auto & data = reader.get();
if (data.second != store[data.first])
throw std::runtime_error("Found discrepancy");
}
}
}
catch (const Poco::Exception & ex)
{
std::cout << "Test width=" << width << " bucket_count=" << bucket_count << " failed "
<< "(Error: " << ex.what() << ": " << ex.displayText() << ")\n";
ok = false;
}
catch (const std::runtime_error & ex)
{
std::cout << "Test width=" << width << " bucket_count=" << bucket_count << " failed "
<< "(Error: " << ex.what() << ")\n";
ok = false;
}
catch (...)
{
std::cout << "Test width=" << width << " bucket_count=" << bucket_count << " failed\n";
ok = false;
}
fs::remove_all(fs::path(filename).parent_path().string());
if (ok)
std::cout << "Test width=" << width << " bucket_count=" << bucket_count << " passed\n";
}
};
template <typename Generator>
struct TestSet
{
static void execute()
{
Test<1, 1, Generator>::perform();
Test<1, 2, Generator>::perform();
Test<1, 3, Generator>::perform();
Test<1, 4, Generator>::perform();
Test<1, 5, Generator>::perform();
Test<1, 6, Generator>::perform();
Test<1, 7, Generator>::perform();
Test<1, 8, Generator>::perform();
Test<1, 9, Generator>::perform();
Test<1, 10, Generator>::perform();
Test<1, 16, Generator>::perform();
Test<1, 32, Generator>::perform();
Test<1, 64, Generator>::perform();
Test<1, 128, Generator>::perform();
Test<1, 256, Generator>::perform();
Test<1, 512, Generator>::perform();
Test<1, 1024, Generator>::perform();
Test<2, 1, Generator>::perform();
Test<2, 2, Generator>::perform();
Test<2, 3, Generator>::perform();
Test<2, 4, Generator>::perform();
Test<2, 5, Generator>::perform();
Test<2, 6, Generator>::perform();
Test<2, 7, Generator>::perform();
Test<2, 8, Generator>::perform();
Test<2, 9, Generator>::perform();
Test<2, 10, Generator>::perform();
Test<2, 16, Generator>::perform();
Test<2, 32, Generator>::perform();
Test<2, 64, Generator>::perform();
Test<2, 128, Generator>::perform();
Test<2, 256, Generator>::perform();
Test<2, 512, Generator>::perform();
Test<2, 1024, Generator>::perform();
Test<3, 1, Generator>::perform();
Test<3, 2, Generator>::perform();
Test<3, 3, Generator>::perform();
Test<3, 4, Generator>::perform();
Test<3, 5, Generator>::perform();
Test<3, 6, Generator>::perform();
Test<3, 7, Generator>::perform();
Test<3, 8, Generator>::perform();
Test<3, 9, Generator>::perform();
Test<3, 10, Generator>::perform();
Test<3, 16, Generator>::perform();
Test<3, 32, Generator>::perform();
Test<3, 64, Generator>::perform();
Test<3, 128, Generator>::perform();
Test<3, 256, Generator>::perform();
Test<3, 512, Generator>::perform();
Test<3, 1024, Generator>::perform();
Test<4, 1, Generator>::perform();
Test<4, 2, Generator>::perform();
Test<4, 3, Generator>::perform();
Test<4, 4, Generator>::perform();
Test<4, 5, Generator>::perform();
Test<4, 6, Generator>::perform();
Test<4, 7, Generator>::perform();
Test<4, 8, Generator>::perform();
Test<4, 9, Generator>::perform();
Test<4, 10, Generator>::perform();
Test<4, 16, Generator>::perform();
Test<4, 32, Generator>::perform();
Test<4, 64, Generator>::perform();
Test<4, 128, Generator>::perform();
Test<4, 256, Generator>::perform();
Test<4, 512, Generator>::perform();
Test<4, 1024, Generator>::perform();
Test<5, 1, Generator>::perform();
Test<5, 2, Generator>::perform();
Test<5, 3, Generator>::perform();
Test<5, 4, Generator>::perform();
Test<5, 5, Generator>::perform();
Test<5, 6, Generator>::perform();
Test<5, 7, Generator>::perform();
Test<5, 8, Generator>::perform();
Test<5, 9, Generator>::perform();
Test<5, 10, Generator>::perform();
Test<5, 16, Generator>::perform();
Test<5, 32, Generator>::perform();
Test<5, 64, Generator>::perform();
Test<5, 128, Generator>::perform();
Test<5, 256, Generator>::perform();
Test<5, 512, Generator>::perform();
Test<5, 1024, Generator>::perform();
Test<6, 1, Generator>::perform();
Test<6, 2, Generator>::perform();
Test<6, 3, Generator>::perform();
Test<6, 4, Generator>::perform();
Test<6, 5, Generator>::perform();
Test<6, 6, Generator>::perform();
Test<6, 7, Generator>::perform();
Test<6, 8, Generator>::perform();
Test<6, 9, Generator>::perform();
Test<6, 10, Generator>::perform();
Test<6, 16, Generator>::perform();
Test<6, 32, Generator>::perform();
Test<6, 64, Generator>::perform();
Test<6, 128, Generator>::perform();
Test<6, 256, Generator>::perform();
Test<6, 512, Generator>::perform();
Test<6, 1024, Generator>::perform();
Test<7, 1, Generator>::perform();
Test<7, 2, Generator>::perform();
Test<7, 3, Generator>::perform();
Test<7, 4, Generator>::perform();
Test<7, 5, Generator>::perform();
Test<7, 6, Generator>::perform();
Test<7, 7, Generator>::perform();
Test<7, 8, Generator>::perform();
Test<7, 9, Generator>::perform();
Test<7, 10, Generator>::perform();
Test<7, 16, Generator>::perform();
Test<7, 32, Generator>::perform();
Test<7, 64, Generator>::perform();
Test<7, 128, Generator>::perform();
Test<7, 256, Generator>::perform();
Test<7, 512, Generator>::perform();
Test<7, 1024, Generator>::perform();
}
};
struct Generator1
{
static UInt8 execute(size_t i, size_t width)
{
return (1 << width) - 1;
}
};
struct Generator2
{
static UInt8 execute(size_t i, size_t width)
{
return (i >> 1) & ((1 << width) - 1);
}
};
struct Generator3
{
static UInt8 execute(size_t i, size_t width)
{
return (i * 17 + 31) % (1ULL << width);
}
};
void runTests()
{
std::cout << "Test set 1\n";
TestSet<Generator1>::execute();
std::cout << "Test set 2\n";
TestSet<Generator2>::execute();
std::cout << "Test set 3\n";
TestSet<Generator3>::execute();
}
int main()
{
runTests();
return 0;
}

View File

@ -265,21 +265,28 @@ void readBackQuotedString(String & s, ReadBuffer & buf)
void readDateTimeTextFallback(time_t & datetime, ReadBuffer & buf)
{
char s[19];
static constexpr auto DATE_TIME_BROKEN_DOWN_LENGTH = 19;
static constexpr auto UNIX_TIMESTAMP_MAX_LENGTH = 10;
size_t size = buf.read(s, 10);
if (10 != size)
char s[DATE_TIME_BROKEN_DOWN_LENGTH];
char * s_pos = s;
/// Кусок, похожий на unix timestamp.
while (s_pos < s + UNIX_TIMESTAMP_MAX_LENGTH && !buf.eof() && *buf.position() >= '0' && *buf.position() <= '9')
{
s[size] = 0;
throw Exception(std::string("Cannot parse datetime ") + s, ErrorCodes::CANNOT_PARSE_DATETIME);
*s_pos = *buf.position();
++s_pos;
++buf.position();
}
if (s[4] < '0' || s[4] > '9')
/// 2015-01-01 01:02:03
if (s_pos == s + 4 && !buf.eof() && (*buf.position() < '0' || *buf.position() > '9'))
{
size_t size = buf.read(&s[10], 9);
if (9 != size)
const size_t remaining_size = DATE_TIME_BROKEN_DOWN_LENGTH - (s_pos - s);
size_t size = buf.read(s_pos, remaining_size);
if (remaining_size != size)
{
s[10 + size] = 0;
s_pos[size] = 0;
throw Exception(std::string("Cannot parse datetime ") + s, ErrorCodes::CANNOT_PARSE_DATETIME);
}
@ -297,7 +304,7 @@ void readDateTimeTextFallback(time_t & datetime, ReadBuffer & buf)
datetime = DateLUT::instance().makeDateTime(year, month, day, hour, minute, second);
}
else
datetime = parse<time_t>(s, 10);
datetime = parse<time_t>(s, s_pos - s);
}

View File

@ -715,6 +715,15 @@ bool Aggregator::executeOnBlock(Block & block, AggregatedDataVariants & result,
result.convertToTwoLevel();
/// Проверка ограничений.
if (!checkLimits(result_size, no_more_keys))
return false;
return true;
}
bool Aggregator::checkLimits(size_t result_size, bool & no_more_keys) const
{
if (!no_more_keys && max_rows_to_group_by && result_size > max_rows_to_group_by)
{
if (group_by_overflow_mode == OverflowMode::THROW)
@ -1226,6 +1235,86 @@ void NO_INLINE Aggregator::mergeDataImpl(
}
template <typename Method, typename Table>
void NO_INLINE Aggregator::mergeDataNoMoreKeysImpl(
Table & table_dst,
AggregatedDataWithoutKey & overflows,
Table & table_src) const
{
for (auto it = table_src.begin(); it != table_src.end(); ++it)
{
decltype(it) res_it = table_dst.find(it->first, it.getHash());
AggregateDataPtr res_data = table_dst.end() == res_it
? overflows
: Method::getAggregateData(res_it->second);
for (size_t i = 0; i < aggregates_size; ++i)
aggregate_functions[i]->merge(
res_data + offsets_of_aggregate_states[i],
Method::getAggregateData(it->second) + offsets_of_aggregate_states[i]);
for (size_t i = 0; i < aggregates_size; ++i)
aggregate_functions[i]->destroy(
Method::getAggregateData(it->second) + offsets_of_aggregate_states[i]);
Method::getAggregateData(it->second) = nullptr;
}
}
template <typename Method, typename Table>
void NO_INLINE Aggregator::mergeDataOnlyExistingKeysImpl(
Table & table_dst,
Table & table_src) const
{
for (auto it = table_src.begin(); it != table_src.end(); ++it)
{
decltype(it) res_it = table_dst.find(it->first, it.getHash());
if (table_dst.end() == res_it)
continue;
AggregateDataPtr res_data = Method::getAggregateData(res_it->second);
for (size_t i = 0; i < aggregates_size; ++i)
aggregate_functions[i]->merge(
res_data + offsets_of_aggregate_states[i],
Method::getAggregateData(it->second) + offsets_of_aggregate_states[i]);
for (size_t i = 0; i < aggregates_size; ++i)
aggregate_functions[i]->destroy(
Method::getAggregateData(it->second) + offsets_of_aggregate_states[i]);
Method::getAggregateData(it->second) = nullptr;
}
}
template <typename Method, typename Table>
void NO_INLINE Aggregator::mergeDataRemainingKeysToOverflowsImpl(
AggregatedDataWithoutKey & overflows,
Table & table_src) const
{
for (auto it = table_src.begin(); it != table_src.end(); ++it)
{
if (Method::getAggregateData(it->second) == nullptr)
continue;
AggregateDataPtr res_data = overflows;
for (size_t i = 0; i < aggregates_size; ++i)
aggregate_functions[i]->merge(
res_data + offsets_of_aggregate_states[i],
Method::getAggregateData(it->second) + offsets_of_aggregate_states[i]);
for (size_t i = 0; i < aggregates_size; ++i)
aggregate_functions[i]->destroy(
Method::getAggregateData(it->second) + offsets_of_aggregate_states[i]);
Method::getAggregateData(it->second) = nullptr;
}
}
void NO_INLINE Aggregator::mergeWithoutKeyDataImpl(
ManyAggregatedDataVariants & non_empty_data) const
{
@ -1253,15 +1342,25 @@ void NO_INLINE Aggregator::mergeSingleLevelDataImpl(
ManyAggregatedDataVariants & non_empty_data) const
{
AggregatedDataVariantsPtr & res = non_empty_data[0];
bool no_more_keys = false;
/// Все результаты агрегации соединяем с первым.
for (size_t i = 1, size = non_empty_data.size(); i < size; ++i)
{
if (!checkLimits(res->sizeWithoutOverflowRow(), no_more_keys))
break;
AggregatedDataVariants & current = *non_empty_data[i];
mergeDataImpl<Method>(
getDataVariant<Method>(*res).data,
getDataVariant<Method>(current).data);
if (!no_more_keys)
mergeDataImpl<Method>(
getDataVariant<Method>(*res).data,
getDataVariant<Method>(current).data);
else
mergeDataNoMoreKeysImpl<Method>(
getDataVariant<Method>(*res).data,
res->without_key,
getDataVariant<Method>(current).data);
/// current не будет уничтожать состояния агрегатных функций в деструкторе
current.aggregator = nullptr;
@ -1276,8 +1375,13 @@ void NO_INLINE Aggregator::mergeTwoLevelDataImpl(
{
AggregatedDataVariantsPtr & res = non_empty_data[0];
/// В данном случае, no_more_keys будет выставлено, только если в первом (самом большом) состоянии достаточно много строк.
bool no_more_keys = false;
if (!checkLimits(res->sizeWithoutOverflowRow(), no_more_keys))
return;
/// Слияние распараллеливается по корзинам - первому уровню TwoLevelHashMap.
auto merge_bucket = [&non_empty_data, &res, this](size_t bucket, MemoryTracker * memory_tracker)
auto merge_bucket = [&non_empty_data, &res, no_more_keys, this](size_t bucket, MemoryTracker * memory_tracker)
{
current_memory_tracker = memory_tracker;
@ -1286,12 +1390,18 @@ void NO_INLINE Aggregator::mergeTwoLevelDataImpl(
{
AggregatedDataVariants & current = *non_empty_data[i];
mergeDataImpl<Method>(
getDataVariant<Method>(*res).data.impls[bucket],
getDataVariant<Method>(current).data.impls[bucket]);
/// current не будет уничтожать состояния агрегатных функций в деструкторе
current.aggregator = nullptr;
if (!no_more_keys)
{
mergeDataImpl<Method>(
getDataVariant<Method>(*res).data.impls[bucket],
getDataVariant<Method>(current).data.impls[bucket]);
}
else
{
mergeDataOnlyExistingKeysImpl<Method>(
getDataVariant<Method>(*res).data.impls[bucket],
getDataVariant<Method>(current).data.impls[bucket]);
}
}
};
@ -1326,6 +1436,25 @@ void NO_INLINE Aggregator::mergeTwoLevelDataImpl(
for (auto & task : tasks)
if (task.valid())
task.get_future().get();
if (no_more_keys && overflow_row)
{
for (size_t bucket = 0; bucket < Method::Data::NUM_BUCKETS; ++bucket)
{
for (size_t i = 1, size = non_empty_data.size(); i < size; ++i)
{
AggregatedDataVariants & current = *non_empty_data[i];
mergeDataRemainingKeysToOverflowsImpl<Method>(
res->without_key,
getDataVariant<Method>(current).data.impls[bucket]);
}
}
}
/// aggregator не будет уничтожать состояния агрегатных функций в деструкторе
for (size_t i = 1, size = non_empty_data.size(); i < size; ++i)
non_empty_data[i]->aggregator = nullptr;
}
@ -1350,6 +1479,13 @@ AggregatedDataVariantsPtr Aggregator::merge(ManyAggregatedDataVariants & data_va
if (non_empty_data.size() == 1)
return non_empty_data[0];
/// Отсортируем состояния по убыванию размера, чтобы мердж был более эффективным (так как все состояния мерджатся в первое).
std::sort(non_empty_data.begin(), non_empty_data.end(),
[](const AggregatedDataVariantsPtr & lhs, const AggregatedDataVariantsPtr & rhs)
{
return lhs->sizeWithoutOverflowRow() > rhs->sizeWithoutOverflowRow();
});
/// Если хотя бы один из вариантов двухуровневый, то переконвертируем все варианты в двухуровневые, если есть не такие.
/// Замечание - возможно, было бы более оптимально не конвертировать одноуровневые варианты перед мерджем, а мерджить их отдельно, в конце.
@ -1448,13 +1584,14 @@ AggregatedDataVariantsPtr Aggregator::merge(ManyAggregatedDataVariants & data_va
}
template <typename Method, typename Table>
void NO_INLINE Aggregator::mergeStreamsImpl(
template <bool no_more_keys, typename Method, typename Table>
void NO_INLINE Aggregator::mergeStreamsImplCase(
Block & block,
const Sizes & key_sizes,
Arena * aggregates_pool,
Method & method,
Table & data) const
Table & data,
AggregateDataPtr overflow_row) const
{
ConstColumnPlainPtrs key_columns(keys_size);
AggregateColumnsData aggregate_columns(aggregates_size);
@ -1475,13 +1612,33 @@ void NO_INLINE Aggregator::mergeStreamsImpl(
for (size_t i = 0; i < rows; ++i)
{
typename Table::iterator it;
bool inserted; /// Вставили новый ключ, или такой ключ уже был?
bool overflow = false; /// Новый ключ не поместился в хэш-таблицу из-за no_more_keys.
/// Получаем ключ для вставки в хэш-таблицу.
auto key = state.getKey(key_columns, keys_size, i, key_sizes, keys, *aggregates_pool);
data.emplace(key, it, inserted);
if (!no_more_keys)
{
data.emplace(key, it, inserted);
}
else
{
inserted = false;
it = data.find(key);
if (data.end() == it)
overflow = true;
}
/// Если ключ не поместился, и данные не надо агрегировать в отдельную строку, то делать нечего.
if (no_more_keys && overflow && !overflow_row)
{
method.onExistingKey(key, keys, *aggregates_pool);
continue;
}
/// Если вставили новый ключ - инициализируем состояния агрегатных функций, и возможно, что-нибудь связанное с ключом.
if (inserted)
{
AggregateDataPtr & aggregate_data = Method::getAggregateData(it->second);
@ -1496,10 +1653,12 @@ void NO_INLINE Aggregator::mergeStreamsImpl(
else
method.onExistingKey(key, keys, *aggregates_pool);
AggregateDataPtr value = (!no_more_keys || !overflow) ? Method::getAggregateData(it->second) : overflow_row;
/// Мерджим состояния агрегатных функций.
for (size_t j = 0; j < aggregates_size; ++j)
aggregate_functions[j]->merge(
Method::getAggregateData(it->second) + offsets_of_aggregate_states[j],
value + offsets_of_aggregate_states[j],
(*aggregate_columns[j])[i]);
}
@ -1507,6 +1666,23 @@ void NO_INLINE Aggregator::mergeStreamsImpl(
block.clear();
}
template <typename Method, typename Table>
void NO_INLINE Aggregator::mergeStreamsImpl(
Block & block,
const Sizes & key_sizes,
Arena * aggregates_pool,
Method & method,
Table & data,
AggregateDataPtr overflow_row,
bool no_more_keys) const
{
if (!no_more_keys)
mergeStreamsImplCase<false>(block, key_sizes, aggregates_pool, method, data, overflow_row);
else
mergeStreamsImplCase<true>(block, key_sizes, aggregates_pool, method, data, overflow_row);
}
void NO_INLINE Aggregator::mergeWithoutKeyStreamsImpl(
Block & block,
AggregatedDataVariants & result) const
@ -1621,6 +1797,11 @@ void Aggregator::mergeStream(BlockInputStreamPtr stream, AggregatedDataVariants
/// Сначала параллельно мерджим для отдельных bucket-ов. Затем домердживаем данные, не распределённые по bucket-ам.
if (has_two_level)
{
/** В этом случае, no_more_keys не поддерживается в связи с тем, что
* из разных потоков трудно обновлять общее состояние для "остальных" ключей (overflows).
* То есть, ключей в итоге может оказаться существенно больше, чем max_rows_to_group_by.
*/
LOG_TRACE(log, "Merging partially aggregated two-level data.");
auto merge_bucket = [&bucket_to_blocks, &result, &key_sizes, this](Int32 bucket, Arena * aggregates_pool, MemoryTracker * memory_tracker)
@ -1634,7 +1815,7 @@ void Aggregator::mergeStream(BlockInputStreamPtr stream, AggregatedDataVariants
#define M(NAME) \
else if (result.type == AggregatedDataVariants::Type::NAME) \
mergeStreamsImpl(block, key_sizes, aggregates_pool, *result.NAME, result.NAME->data.impls[bucket]);
mergeStreamsImpl(block, key_sizes, aggregates_pool, *result.NAME, result.NAME->data.impls[bucket], nullptr, false);
if (false) {}
APPLY_FOR_VARIANTS_TWO_LEVEL(M)
@ -1691,6 +1872,8 @@ void Aggregator::mergeStream(BlockInputStreamPtr stream, AggregatedDataVariants
{
LOG_TRACE(log, "Merging partially aggregated single-level data.");
bool no_more_keys = false;
BlocksList & blocks = bucket_to_blocks[-1];
for (Block & block : blocks)
{
@ -1700,12 +1883,15 @@ void Aggregator::mergeStream(BlockInputStreamPtr stream, AggregatedDataVariants
return;
}
if (!checkLimits(result.sizeWithoutOverflowRow(), no_more_keys))
break;
if (result.type == AggregatedDataVariants::Type::without_key || block.info.is_overflows)
mergeWithoutKeyStreamsImpl(block, result);
#define M(NAME, IS_TWO_LEVEL) \
else if (result.type == AggregatedDataVariants::Type::NAME) \
mergeStreamsImpl(block, key_sizes, result.aggregates_pool, *result.NAME, result.NAME->data);
mergeStreamsImpl(block, key_sizes, result.aggregates_pool, *result.NAME, result.NAME->data, result.without_key, no_more_keys);
APPLY_FOR_AGGREGATED_VARIANTS(M)
#undef M
@ -1760,7 +1946,7 @@ Block Aggregator::mergeBlocks(BlocksList & blocks, bool final)
#define M(NAME, IS_TWO_LEVEL) \
else if (result.type == AggregatedDataVariants::Type::NAME) \
mergeStreamsImpl(block, key_sizes, result.aggregates_pool, *result.NAME, result.NAME->data);
mergeStreamsImpl(block, key_sizes, result.aggregates_pool, *result.NAME, result.NAME->data, nullptr, false);
APPLY_FOR_AGGREGATED_VARIANTS(M)
#undef M

View File

@ -8,34 +8,67 @@
namespace DB
{
namespace
{
/// Вес шарда по-умолчанию.
static constexpr int default_weight = 1;
inline bool isLocal(const Cluster::Address & address)
{
/// Если среди реплик существует такая, что:
/// - её порт совпадает с портом, который слушает сервер;
/// - её хост резолвится в набор адресов, один из которых совпадает с одним из адресов сетевых интерфейсов сервера
/// то нужно всегда ходить на этот шард без межпроцессного взаимодействия
return isLocalAddress(address.resolved_address);
}
inline std::string addressToDirName(const Cluster::Address & address)
{
return
escapeForFileName(address.user) +
(address.password.empty() ? "" : (':' + escapeForFileName(address.password))) + '@' +
escapeForFileName(address.resolved_address.host().toString()) + ':' +
std::to_string(address.resolved_address.port());
}
inline bool beginsWith(const std::string & str1, const char * str2)
{
if (str2 == nullptr)
throw Exception("Passed null pointer to function beginsWith", ErrorCodes::LOGICAL_ERROR);
return 0 == strncmp(str1.data(), str2, strlen(str2));
}
/// Для кэширования DNS запросов.
static Poco::Net::SocketAddress resolveSocketAddressImpl1(const String & host, UInt16 port)
Poco::Net::SocketAddress resolveSocketAddressImpl1(const String & host, UInt16 port)
{
return Poco::Net::SocketAddress(host, port);
}
static Poco::Net::SocketAddress resolveSocketAddressImpl2(const String & host_and_port)
Poco::Net::SocketAddress resolveSocketAddressImpl2(const String & host_and_port)
{
return Poco::Net::SocketAddress(host_and_port);
}
static Poco::Net::SocketAddress resolveSocketAddress(const String & host, UInt16 port)
Poco::Net::SocketAddress resolveSocketAddress(const String & host, UInt16 port)
{
static SimpleCache<decltype(resolveSocketAddressImpl1), &resolveSocketAddressImpl1> cache;
return cache(host, port);
}
static Poco::Net::SocketAddress resolveSocketAddress(const String & host_and_port)
Poco::Net::SocketAddress resolveSocketAddress(const String & host_and_port)
{
static SimpleCache<decltype(resolveSocketAddressImpl2), &resolveSocketAddressImpl2> cache;
return cache(host_and_port);
}
}
/// Реализация класса Cluster::Address
Cluster::Address::Address(const String & config_prefix)
{
auto & config = Poco::Util::Application::instance().config();
const auto & config = Poco::Util::Application::instance().config();
host_name = config.getString(config_prefix + ".host");
port = config.getInt(config_prefix + ".port");
@ -51,7 +84,7 @@ Cluster::Address::Address(const String & host_port_, const String & user_, const
UInt16 default_port = Poco::Util::Application::instance().config().getInt("tcp_port", 0);
/// Похоже на то, что строка host_port_ содержит порт. Если условие срабатывает - не обязательно значит, что порт есть (пример: [::]).
if (nullptr != strchr(host_port_.c_str(), ':') || !default_port)
if ((nullptr != strchr(host_port_.c_str(), ':')) || !default_port)
{
resolved_address = resolveSocketAddress(host_port_);
host_name = host_port_.substr(0, host_port_.find(':'));
@ -65,19 +98,7 @@ Cluster::Address::Address(const String & host_port_, const String & user_, const
}
}
namespace
{
inline std::string addressToDirName(const Cluster::Address & address)
{
return
escapeForFileName(address.user) +
(address.password.empty() ? "" : (':' + escapeForFileName(address.password))) + '@' +
escapeForFileName(address.resolved_address.host().toString()) + ':' +
std::to_string(address.resolved_address.port());
}
}
/// Реализация класса Clusters
Clusters::Clusters(const Settings & settings, const String & config_name)
{
@ -85,17 +106,16 @@ Clusters::Clusters(const Settings & settings, const String & config_name)
Poco::Util::AbstractConfiguration::Keys config_keys;
config.keys(config_name, config_keys);
for (Poco::Util::AbstractConfiguration::Keys::const_iterator it = config_keys.begin(); it != config_keys.end(); ++it)
for (const auto & key : config_keys)
impl.emplace(std::piecewise_construct,
std::forward_as_tuple(*it),
std::forward_as_tuple(settings, config_name + "." + *it));
std::forward_as_tuple(key),
std::forward_as_tuple(settings, config_name + "." + key));
}
/// Реализация класса Cluster
Cluster::Cluster(const Settings & settings, const String & cluster_name)
{
/// Создать кластер.
Poco::Util::AbstractConfiguration & config = Poco::Util::Application::instance().config();
Poco::Util::AbstractConfiguration::Keys config_keys;
config.keys(cluster_name, config_keys);
@ -104,35 +124,56 @@ Cluster::Cluster(const Settings & settings, const String & cluster_name)
UInt32 current_shard_num = 1;
for (auto it = config_keys.begin(); it != config_keys.end(); ++it)
for (const auto & key : config_keys)
{
if (0 == strncmp(it->c_str(), "node", strlen("node")))
if (beginsWith(key, "node"))
{
const auto & prefix = config_prefix + *it;
const auto weight = config.getInt(prefix + ".weight", 1);
/// Шард без реплик.
const auto & prefix = config_prefix + key;
const auto weight = config.getInt(prefix + ".weight", default_weight);
if (weight == 0)
continue;
addresses.emplace_back(prefix);
addresses.back().replica_num = 1;
const auto & address = addresses.back();
slot_to_shard.insert(std::end(slot_to_shard), weight, shard_info_vec.size());
if (const auto is_local = isLocal(addresses.back()))
shard_info_vec.push_back({{}, current_shard_num, weight, is_local});
ShardInfo info;
info.shard_num = current_shard_num;
info.weight = weight;
if (isLocal(address))
info.local_addresses.push_back(address);
else
shard_info_vec.push_back({{addressToDirName(addresses.back())}, current_shard_num, weight, is_local});
{
info.dir_names.push_back(addressToDirName(address));
info.pool = new ConnectionPool(
settings.distributed_connections_pool_size,
address.host_name, address.port, address.resolved_address,
"", address.user, address.password,
"server", Protocol::Compression::Enable,
saturate(settings.connect_timeout, settings.limits.max_execution_time),
saturate(settings.receive_timeout, settings.limits.max_execution_time),
saturate(settings.send_timeout, settings.limits.max_execution_time));
}
slot_to_shard.insert(std::end(slot_to_shard), weight, shards_info.size());
shards_info.push_back(info);
}
else if (0 == strncmp(it->c_str(), "shard", strlen("shard")))
else if (beginsWith(key, "shard"))
{
/// Шард с репликами.
Poco::Util::AbstractConfiguration::Keys replica_keys;
config.keys(config_prefix + *it, replica_keys);
config.keys(config_prefix + key, replica_keys);
addresses_with_failover.emplace_back();
Addresses & replica_addresses = addresses_with_failover.back();
UInt32 current_replica_num = 1;
const auto & partial_prefix = config_prefix + *it + ".";
const auto weight = config.getInt(partial_prefix + ".weight", 1);
const auto & partial_prefix = config_prefix + key + ".";
const auto weight = config.getInt(partial_prefix + ".weight", default_weight);
if (weight == 0)
continue;
@ -142,26 +183,20 @@ Cluster::Cluster(const Settings & settings, const String & cluster_name)
* the first element of vector; otherwise we will just .emplace_back
*/
std::vector<std::string> dir_names{};
size_t num_local_nodes = 0;
auto first = true;
for (auto jt = replica_keys.begin(); jt != replica_keys.end(); ++jt)
for (const auto & replica_key : replica_keys)
{
if (0 == strncmp(jt->data(), "weight", strlen("weight")) ||
0 == strncmp(jt->data(), "internal_replication", strlen("internal_replication")))
if (beginsWith(replica_key, "weight") || beginsWith(replica_key, "internal_replication"))
continue;
if (0 == strncmp(jt->c_str(), "replica", strlen("replica")))
if (beginsWith(replica_key, "replica"))
{
replica_addresses.emplace_back(partial_prefix + *jt);
replica_addresses.emplace_back(partial_prefix + replica_key);
replica_addresses.back().replica_num = current_replica_num;
++current_replica_num;
if (isLocal(replica_addresses.back()))
{
++num_local_nodes;
}
else
if (!isLocal(replica_addresses.back()))
{
if (internal_replication)
{
@ -178,40 +213,18 @@ Cluster::Cluster(const Settings & settings, const String & cluster_name)
}
}
else
throw Exception("Unknown element in config: " + *jt, ErrorCodes::UNKNOWN_ELEMENT_IN_CONFIG);
throw Exception("Unknown element in config: " + replica_key, ErrorCodes::UNKNOWN_ELEMENT_IN_CONFIG);
}
slot_to_shard.insert(std::end(slot_to_shard), weight, shard_info_vec.size());
shard_info_vec.push_back({std::move(dir_names), current_shard_num, weight, num_local_nodes});
}
else
throw Exception("Unknown element in config: " + *it, ErrorCodes::UNKNOWN_ELEMENT_IN_CONFIG);
Addresses shard_local_addresses;
++current_shard_num;
}
/// Создать соответствующие пулы соединений.
if (!addresses_with_failover.empty() && !addresses.empty())
throw Exception("There must be either 'node' or 'shard' elements in config", ErrorCodes::EXCESSIVE_ELEMENT_IN_CONFIG);
if (!addresses_with_failover.empty())
{
for (const auto & shard : addresses_with_failover)
{
ConnectionPools replicas;
replicas.reserve(shard.size());
replicas.reserve(replica_addresses.size());
bool has_local_replica = false;
for (const auto & replica : shard)
for (const auto & replica : replica_addresses)
{
if (isLocal(replica))
{
has_local_replica = true;
local_addresses.push_back(replica);
break;
}
shard_local_addresses.push_back(replica);
else
{
replicas.emplace_back(new ConnectionPool(
@ -225,42 +238,31 @@ Cluster::Cluster(const Settings & settings, const String & cluster_name)
}
}
if (has_local_replica)
++local_nodes_num;
else
pools.emplace_back(new ConnectionPoolWithFailover(replicas, settings.load_balancing, settings.connections_with_failover_max_tries));
ConnectionPoolPtr shard_pool;
if (!replicas.empty())
shard_pool = new ConnectionPoolWithFailover(replicas, settings.load_balancing, settings.connections_with_failover_max_tries);
slot_to_shard.insert(std::end(slot_to_shard), weight, shards_info.size());
shards_info.push_back({std::move(dir_names), current_shard_num, weight, shard_local_addresses, shard_pool});
}
else
throw Exception("Unknown element in config: " + key, ErrorCodes::UNKNOWN_ELEMENT_IN_CONFIG);
if (!addresses_with_failover.empty() && !addresses.empty())
throw Exception("There must be either 'node' or 'shard' elements in config", ErrorCodes::EXCESSIVE_ELEMENT_IN_CONFIG);
++current_shard_num;
}
else if (!addresses.empty())
{
for (const auto & address : addresses)
{
if (isLocal(address))
{
local_addresses.push_back(address);
++local_nodes_num;
}
else
{
pools.emplace_back(new ConnectionPool(
settings.distributed_connections_pool_size,
address.host_name, address.port, address.resolved_address,
"", address.user, address.password,
"server", Protocol::Compression::Enable,
saturate(settings.connect_timeout, settings.limits.max_execution_time),
saturate(settings.receive_timeout, settings.limits.max_execution_time),
saturate(settings.send_timeout, settings.limits.max_execution_time)));
}
}
}
else
throw Exception("No addresses listed in config", ErrorCodes::NO_ELEMENTS_IN_CONFIG);
initMisc();
}
Cluster::Cluster(const Settings & settings, std::vector<std::vector<String>> names,
const String & username, const String & password)
{
UInt32 current_shard_num = 1;
for (const auto & shard : names)
{
Addresses current;
@ -284,8 +286,14 @@ Cluster::Cluster(const Settings & settings, std::vector<std::vector<String>> nam
saturate(settings.send_timeout, settings.limits.max_execution_time)));
}
pools.emplace_back(new ConnectionPoolWithFailover(replicas, settings.load_balancing, settings.connections_with_failover_max_tries));
ConnectionPoolPtr shard_pool = new ConnectionPoolWithFailover(replicas, settings.load_balancing, settings.connections_with_failover_max_tries);
slot_to_shard.insert(std::end(slot_to_shard), default_weight, shards_info.size());
shards_info.push_back({{}, current_shard_num, default_weight, {}, shard_pool});
++current_shard_num;
}
initMisc();
}
@ -294,17 +302,35 @@ Poco::Timespan Cluster::saturate(const Poco::Timespan & v, const Poco::Timespan
if (limit.totalMicroseconds() == 0)
return v;
else
return v > limit ? limit : v;
return (v > limit) ? limit : v;
}
bool Cluster::isLocal(const Address & address)
void Cluster::initMisc()
{
/// Если среди реплик существует такая, что:
/// - её порт совпадает с портом, который слушает сервер;
/// - её хост резолвится в набор адресов, один из которых совпадает с одним из адресов сетевых интерфейсов сервера
/// то нужно всегда ходить на этот шард без межпроцессного взаимодействия
return isLocalAddress(address.resolved_address);
for (const auto & shard_info : shards_info)
{
if (!shard_info.isLocal() && !shard_info.hasRemoteConnections())
throw Exception("Found shard without any specified connection",
ErrorCodes::SHARD_HAS_NO_CONNECTIONS);
}
for (const auto & shard_info : shards_info)
{
if (shard_info.isLocal())
++local_shard_count;
else
++remote_shard_count;
}
for (auto & shard_info : shards_info)
{
if (!shard_info.isLocal())
{
any_remote_shard_info = &shard_info;
break;
}
}
}
}

View File

@ -148,9 +148,9 @@ void ExpressionAction::prepare(Block & sample_block)
case ARRAY_JOIN:
{
for (NameSet::iterator it = array_joined_columns.begin(); it != array_joined_columns.end(); ++it)
for (const auto & name : array_joined_columns)
{
ColumnWithTypeAndName & current = sample_block.getByName(*it);
ColumnWithTypeAndName & current = sample_block.getByName(name);
const DataTypeArray * array_type = typeid_cast<const DataTypeArray *>(&*current.type);
if (!array_type)
throw Exception("ARRAY JOIN requires array argument", ErrorCodes::TYPE_MISMATCH);
@ -214,6 +214,7 @@ void ExpressionAction::prepare(Block & sample_block)
}
}
void ExpressionAction::execute(Block & block) const
{
// std::cerr << "executing: " << toString() << std::endl;
@ -261,9 +262,11 @@ void ExpressionAction::execute(Block & block) const
{
if (array_joined_columns.empty())
throw Exception("No arrays to join", ErrorCodes::LOGICAL_ERROR);
ColumnPtr any_array_ptr = block.getByName(*array_joined_columns.begin()).column;
if (any_array_ptr->isConst())
any_array_ptr = dynamic_cast<const IColumnConst &>(*any_array_ptr).convertToFullColumn();
const ColumnArray * any_array = typeid_cast<const ColumnArray *>(&*any_array_ptr);
if (!any_array)
throw Exception("ARRAY JOIN of not array: " + *array_joined_columns.begin(), ErrorCodes::TYPE_MISMATCH);
@ -552,19 +555,22 @@ bool ExpressionActions::popUnusedArrayJoin(const Names & required_columns, Expre
{
if (actions.empty() || actions.back().type != ExpressionAction::ARRAY_JOIN)
return false;
NameSet required_set(required_columns.begin(), required_columns.end());
for (const std::string & name : actions.back().array_joined_columns)
{
if (required_set.count(name))
return false;
}
for (const std::string & name : actions.back().array_joined_columns)
{
DataTypePtr & type = sample_block.getByName(name).type;
type = new DataTypeArray(type);
}
out_action = actions.back();
actions.pop_back();
return true;
}
@ -1015,7 +1021,11 @@ void ExpressionActionsChain::finalize()
steps[i].actions->finalize(required_output);
}
/// Когда возможно, перенесем ARRAY JOIN из более ранних шагов в более поздние.
/** Когда возможно, перенесем ARRAY JOIN из более ранних шагов в более поздние.
* Замечание: обычно это полезно, так как ARRAY JOIN - сложная операция, которая, как правило, увеличивает объём данных.
* Но не всегда - в случае, если большинство массивов пустые, ARRAY JOIN, наоборот, уменьшает объём данных,
* и его было бы полезно делать раньше. Этот случай не рассматривается.
*/
for (size_t i = 1; i < steps.size(); ++i)
{
ExpressionAction action;

View File

@ -211,13 +211,17 @@ void ExpressionAnalyzer::analyzeAggregation()
/// constant expressions have non-null column pointer at this stage
if (const auto is_constexpr = col.column)
{
if (i < group_asts.size() - 1)
group_asts[i] = std::move(group_asts.back());
/// but don't remove last key column if no aggregate functions, otherwise aggregation will not work
if (!aggregate_descriptions.empty() || group_asts.size() > 1)
{
if (i < group_asts.size() - 1)
group_asts[i] = std::move(group_asts.back());
group_asts.pop_back();
i -= 1;
group_asts.pop_back();
i -= 1;
continue;
continue;
}
}
NameAndTypePair key{column_name, col.type};
@ -781,7 +785,26 @@ void ExpressionAnalyzer::optimizeGroupBy()
}
if (group_exprs.empty())
select_query->group_expression_list = nullptr;
{
/** Нельзя полностью убирать GROUP BY. Потому что если при этом даже агрегатных функций не было, то получится, что не будет агрегации.
* Вместо этого оставим GROUP BY const.
* Далее см. удаление констант в методе analyzeAggregation.
*/
/// Нужно вставить константу, которая не является именем столбца таблицы. Такой случай редкий, но бывает.
UInt64 unused_column = 0;
String unused_column_name = toString(unused_column);
while (columns.end() != std::find_if(columns.begin(), columns.end(),
[&unused_column_name](const NameAndTypePair & name_type) { return name_type.name == unused_column_name; }))
{
++unused_column;
unused_column_name = toString(unused_column);
}
select_query->group_expression_list = new ASTExpressionList;
select_query->group_expression_list->children.push_back(new ASTLiteral(StringRange(), UInt64(unused_column)));
}
}
@ -1384,12 +1407,15 @@ void ExpressionAnalyzer::getArrayJoinedColumns()
{
const String nested_table_name = ast->getColumnName();
const String nested_table_alias = ast->getAliasOrColumnName();
if (nested_table_alias == nested_table_name && !typeid_cast<const ASTIdentifier *>(&*ast))
throw Exception("No alias for non-trivial value in ARRAY JOIN: " + nested_table_name, ErrorCodes::ALIAS_REQUIRED);
if (array_join_alias_to_name.count(nested_table_alias) || aliases.count(nested_table_alias))
throw Exception("Duplicate alias " + nested_table_alias, ErrorCodes::MULTIPLE_EXPRESSIONS_FOR_ALIAS);
array_join_alias_to_name[nested_table_alias] = nested_table_name;
array_join_name_to_alias[nested_table_name] = nested_table_alias;
}
ASTs & query_asts = select_query->children;
@ -1432,6 +1458,7 @@ void ExpressionAnalyzer::getArrayJoinedColumns()
}
/// Заполняет array_join_result_to_source: по каким столбцам-массивам размножить, и как их после этого назвать.
void ExpressionAnalyzer::getArrayJoinedColumnsImpl(ASTPtr ast)
{
if (ASTIdentifier * node = typeid_cast<ASTIdentifier *>(&*ast))
@ -1439,14 +1466,29 @@ void ExpressionAnalyzer::getArrayJoinedColumnsImpl(ASTPtr ast)
if (node->kind == ASTIdentifier::Column)
{
String table_name = DataTypeNested::extractNestedTableName(node->name);
if (array_join_alias_to_name.count(node->name))
array_join_result_to_source[node->name] = array_join_alias_to_name[node->name];
{
/// Был написан ARRAY JOIN со столбцом-массивом. Пример: SELECT K1 FROM ... ARRAY JOIN ParsedParams.Key1 AS K1
array_join_result_to_source[node->name] = array_join_alias_to_name[node->name]; /// K1 -> ParsedParams.Key1
}
else if (array_join_alias_to_name.count(table_name))
{
String nested_column = DataTypeNested::extractNestedColumnName(node->name);
array_join_result_to_source[node->name]
/// Был написан ARRAY JOIN с вложенной таблицей. Пример: SELECT PP.Key1 FROM ... ARRAY JOIN ParsedParams AS PP
String nested_column = DataTypeNested::extractNestedColumnName(node->name); /// Key1
array_join_result_to_source[node->name] /// PP.Key1 -> ParsedParams.Key1
= DataTypeNested::concatenateNestedName(array_join_alias_to_name[table_name], nested_column);
}
else if (array_join_name_to_alias.count(table_name))
{
/** Пример: SELECT ParsedParams.Key1 FROM ... ARRAY JOIN ParsedParams AS PP.
* То есть, в запросе используется исходный массив, размноженный по самому себе.
*/
String nested_column = DataTypeNested::extractNestedColumnName(node->name); /// Key1
array_join_result_to_source[ /// PP.Key1 -> ParsedParams.Key1
DataTypeNested::concatenateNestedName(array_join_name_to_alias[table_name], nested_column)] = node->name;
}
}
}
else
@ -1488,10 +1530,12 @@ void ExpressionAnalyzer::getActionsImpl(ASTPtr ast, bool no_subqueries, bool onl
if (node->kind == ASTFunction::LAMBDA_EXPRESSION)
throw Exception("Unexpected expression", ErrorCodes::UNEXPECTED_EXPRESSION);
/// Функция arrayJoin.
if (node->kind == ASTFunction::ARRAY_JOIN)
{
if (node->arguments->children.size() != 1)
throw Exception("arrayJoin requires exactly 1 argument", ErrorCodes::TYPE_MISMATCH);
ASTPtr arg = node->arguments->children.at(0);
getActionsImpl(arg, no_subqueries, only_consts, actions_stack);
if (!only_consts)
@ -1787,13 +1831,17 @@ void ExpressionAnalyzer::initChain(ExpressionActionsChain & chain, const NamesAn
}
}
/// "Большой" ARRAY JOIN.
void ExpressionAnalyzer::addMultipleArrayJoinAction(ExpressionActionsPtr & actions) const
{
NameSet result_columns;
for (const auto & result_source : array_join_result_to_source)
{
/// Дать столбцам новые имена, если надо.
if (result_source.first != result_source.second)
actions->add(ExpressionAction::copyColumn(result_source.second, result_source.first));
/// Сделать ARRAY JOIN (заменить массивы на их внутренности) для столбцов в этими новыми именами.
result_columns.insert(result_source.first);
}

View File

@ -33,6 +33,7 @@
#include <DB/DataTypes/DataTypeNested.h>
#include <DB/DataTypes/DataTypeFixedString.h>
#include <DB/DataTypes/DataTypeFactory.h>
#include <DB/DataTypes/DataTypeArray.h>
namespace DB
@ -331,18 +332,21 @@ InterpreterCreateQuery::ColumnsAndDefaults InterpreterCreateQuery::parseColumns(
/// specific code for different data types, e.g. toFixedString(col, N) for DataTypeFixedString
if (const auto fixed_string = typeid_cast<const DataTypeFixedString *>(data_type_ptr))
{
const auto conversion_function_name = "toFixedString";
default_expr_list->children.emplace_back(setAlias(
makeASTFunction(
conversion_function_name,
"toFixedString",
ASTPtr{new ASTIdentifier{{}, tmp_column_name}},
ASTPtr{new ASTLiteral{{}, fixed_string->getN()}}),
final_column_name));
}
else if (typeid_cast<const DataTypeArray *>(data_type_ptr))
{
/// do not perform conversion on arrays, require exact type match
default_expr_list->children.emplace_back(setAlias(
col_decl.default_expression->clone(), final_column_name));
}
else
{
/// @todo fix for parametric types, results in broken code, i.e. toArray(ElementType)(col)
const auto conversion_function_name = "to" + data_type_ptr->getName();
default_expr_list->children.emplace_back(setAlias(
@ -370,15 +374,29 @@ InterpreterCreateQuery::ColumnsAndDefaults InterpreterCreateQuery::parseColumns(
const auto name_and_type_ptr = column.first;
const auto col_decl_ptr = column.second;
if (col_decl_ptr->type)
{
const auto & tmp_column = block.getByName(col_decl_ptr->name + "_tmp");
const auto & column_name = col_decl_ptr->name;
const auto has_explicit_type = nullptr != col_decl_ptr->type;
auto & explicit_type = name_and_type_ptr->type;
/// type mismatch between explicitly specified and deduced type, add conversion
if (name_and_type_ptr->type->getName() != tmp_column.type->getName())
/// if column declaration contains explicit type, name_and_type_ptr->type is not null
if (has_explicit_type)
{
const auto & tmp_column = block.getByName(column_name + "_tmp");
const auto & deduced_type = tmp_column.type;
/// type mismatch between explicitly specified and deduced type, add conversion for non-array types
if (explicit_type->getName() != deduced_type->getName())
{
col_decl_ptr->default_expression = makeASTFunction(
"to" + name_and_type_ptr->type->getName(),
/// foolproof against defaulting array columns incorrectly
if (typeid_cast<const DataTypeArray *>(explicit_type.get()))
throw Exception{
"Default expression type mismatch for column " + column_name +
". Expected " + explicit_type->getName() + ", deduced " +
deduced_type->getName(),
ErrorCodes::TYPE_MISMATCH
};
col_decl_ptr->default_expression = makeASTFunction("to" + explicit_type->getName(),
col_decl_ptr->default_expression);
col_decl_ptr->children.clear();
@ -387,9 +405,10 @@ InterpreterCreateQuery::ColumnsAndDefaults InterpreterCreateQuery::parseColumns(
}
}
else
name_and_type_ptr->type = block.getByName(name_and_type_ptr->name).type;
/// no explicit type, name_and_type_ptr->type is null, set to deduced type
explicit_type = block.getByName(column_name).type;
defaults.emplace(col_decl_ptr->name, ColumnDefault{
defaults.emplace(column_name, ColumnDefault{
columnDefaultTypeFromString(col_decl_ptr->default_specifier),
col_decl_ptr->default_expression
});

View File

@ -471,6 +471,17 @@ void InterpreterSelectQuery::executeSingleQuery()
/// Если есть агрегация, выполняем выражения в SELECT и ORDER BY на инициировавшем сервере, иначе - на серверах-источниках.
query_analyzer->appendSelect(chain, need_aggregate ? !second_stage : !first_stage);
selected_columns = chain.getLastStep().required_output;
/** Если есть LIMIT, то ARRAY JOIN нельзя переносить позже ORDER BY
* (так как он меняет количество строк и частичная сортировка ORDER с LIMIT-ом перестают правильно работать).
*/
if (query.order_expression_list && query.limit_length && query.array_join_expression_list)
{
/// Завершаем цепочку действий, в рамках которой ARRAY JOIN может переноситься.
chain.finalize();
chain.clear();
}
has_order_by = query_analyzer->appendOrderBy(chain, need_aggregate ? !second_stage : !first_stage);
before_order_and_select = chain.getLastActions();
chain.addStep();

16
dbms/src/ODBC/README Normal file
View File

@ -0,0 +1,16 @@
Install unixodbc.
g++ -std=gnu++1y -Wall -g -shared -fPIC -lPocoFoundation -lPocoNet -o odbc.so odbc.cpp
~/.odbc.ini:
[ClickHouse]
Driver = /home/milovidov/work/metrika-core/metrica/src/dbms/src/ODBC/odbc.so
Description = ClickHouse driver
DATABASE = default
HOST = localhost
PORT = 9000
FRAMED = 0
Run
iusql -v ClickHouse

1750
dbms/src/ODBC/odbc.cpp Normal file

File diff suppressed because it is too large Load Diff

View File

@ -8,10 +8,8 @@ using namespace DB;
bool ParserCheckQuery::parseImpl(IParser::Pos & pos, IParser::Pos end, ASTPtr & node, Pos & max_parsed_pos, Expected & expected)
{
ParserWhiteSpaceOrComments ws;
ParserString s_check("CHECK", true, true);
ParserString s_table("TABLE", true, true);
ParserString s_format("FORMAT", true, true);
ParserString s_dot(".");
ParserIdentifier table_parser;
@ -50,18 +48,8 @@ bool ParserCheckQuery::parseImpl(IParser::Pos & pos, IParser::Pos end, ASTPtr &
ws.ignore(pos, end);
/// FORMAT format_name
if (s_format.ignore(pos, end, max_parsed_pos, expected))
{
ws.ignore(pos, end);
ParserIdentifier format_p;
if (!format_p.parse(pos, end, query->format, max_parsed_pos, expected))
return false;
typeid_cast<ASTIdentifier &>(*query->format).kind = ASTIdentifier::Format;
ws.ignore(pos, end);
}
if (!parseFormat(*query, pos, end, node, max_parsed_pos, expected))
return false;
node = query;
return true;

View File

@ -0,0 +1,28 @@
#include <DB/Parsers/ParserQueryWithOutput.h>
#include <DB/Parsers/ASTIdentifier.h>
#include <DB/Parsers/ExpressionElementParsers.h>
namespace DB
{
bool ParserQueryWithOutput::parseFormat(ASTQueryWithOutput & query, Pos & pos, Pos end, ASTPtr & node, Pos & max_parsed_pos, Expected & expected)
{
ParserString s_format("FORMAT", true, true);
if (s_format.ignore(pos, end, max_parsed_pos, expected))
{
ws.ignore(pos, end);
ParserIdentifier format_p;
if (!format_p.parse(pos, end, query.format, max_parsed_pos, expected))
return false;
typeid_cast<ASTIdentifier &>(*(query.format)).kind = ASTIdentifier::Format;
ws.ignore(pos, end);
}
return true;
}
}

View File

@ -19,7 +19,6 @@ bool ParserSelectQuery::parseImpl(Pos & pos, Pos end, ASTPtr & node, Pos & max_p
ASTSelectQuery * select_query = new ASTSelectQuery;
node = select_query;
ParserWhiteSpaceOrComments ws;
ParserString s_select("SELECT", true, true);
ParserString s_distinct("DISTINCT", true, true);
ParserString s_from("FROM", true, true);
@ -39,7 +38,6 @@ bool ParserSelectQuery::parseImpl(Pos & pos, Pos end, ASTPtr & node, Pos & max_p
ParserString s_order("ORDER", true, true);
ParserString s_limit("LIMIT", true, true);
ParserString s_settings("SETTINGS", true, true);
ParserString s_format("FORMAT", true, true);
ParserString s_union("UNION", true, true);
ParserString s_all("ALL", true, true);
@ -310,22 +308,9 @@ bool ParserSelectQuery::parseImpl(Pos & pos, Pos end, ASTPtr & node, Pos & max_p
ws.ignore(pos, end);
}
bool has_format = false;
/// FORMAT format_name
if (s_format.ignore(pos, end, max_parsed_pos, expected))
{
ws.ignore(pos, end);
ParserIdentifier format_p;
if (!format_p.parse(pos, end, select_query->format, max_parsed_pos, expected))
return false;
typeid_cast<ASTIdentifier &>(*select_query->format).kind = ASTIdentifier::Format;
ws.ignore(pos, end);
has_format = true;
}
if (!parseFormat(*select_query, pos, end, node, max_parsed_pos, expected))
return false;
// UNION ALL select query
if (s_union.ignore(pos, end, max_parsed_pos, expected))
@ -334,7 +319,7 @@ bool ParserSelectQuery::parseImpl(Pos & pos, Pos end, ASTPtr & node, Pos & max_p
if (s_all.ignore(pos, end, max_parsed_pos, expected))
{
if (has_format)
if (!select_query->format.isNull())
{
/// FORMAT может быть задан только в последнем запросе цепочки UNION ALL.
expected = "FORMAT only in the last SELECT of the UNION ALL chain";

View File

@ -15,20 +15,17 @@ bool ParserShowTablesQuery::parseImpl(Pos & pos, Pos end, ASTPtr & node, Pos & m
{
Pos begin = pos;
ParserWhiteSpaceOrComments ws;
ParserString s_show("SHOW", true, true);
ParserString s_tables("TABLES", true, true);
ParserString s_databases("DATABASES", true, true);
ParserString s_from("FROM", true, true);
ParserString s_not("NOT", true, true);
ParserString s_like("LIKE", true, true);
ParserString s_format("FORMAT", true, true);
ParserStringLiteral like_p;
ParserIdentifier name_p;
ASTPtr like;
ASTPtr database;
ASTPtr format;
ASTShowTablesQuery * query = new ASTShowTablesQuery;
ASTPtr query_ptr = query;
@ -80,18 +77,9 @@ bool ParserShowTablesQuery::parseImpl(Pos & pos, Pos end, ASTPtr & node, Pos & m
ws.ignore(pos, end);
if (s_format.ignore(pos, end, max_parsed_pos, expected))
{
ws.ignore(pos, end);
ParserIdentifier format_p;
if (!format_p.parse(pos, end, format, max_parsed_pos, expected))
return false;
typeid_cast<ASTIdentifier &>(*format).kind = ASTIdentifier::Format;
ws.ignore(pos, end);
}
/// FORMAT format_name
if (!parseFormat(*query, pos, end, node, max_parsed_pos, expected))
return false;
query->range = StringRange(begin, pos);
@ -99,11 +87,8 @@ bool ParserShowTablesQuery::parseImpl(Pos & pos, Pos end, ASTPtr & node, Pos & m
query->from = typeid_cast<ASTIdentifier &>(*database).name;
if (like)
query->like = safeGet<const String &>(typeid_cast<ASTLiteral &>(*like).value);
if (format)
{
query->format = format;
query->children.push_back(format);
}
if (query->format)
query->children.push_back(query->format);
node = query_ptr;

View File

@ -13,20 +13,17 @@ bool ParserTablePropertiesQuery::parseImpl(Pos & pos, Pos end, ASTPtr & node, Po
{
Pos begin = pos;
ParserWhiteSpaceOrComments ws;
ParserString s_exists("EXISTS", true, true);
ParserString s_describe("DESCRIBE", true, true);
ParserString s_desc("DESC", true, true);
ParserString s_show("SHOW", true, true);
ParserString s_create("CREATE", true, true);
ParserString s_table("TABLE", true, true);
ParserString s_format("FORMAT", true, true);
ParserString s_dot(".");
ParserIdentifier name_p;
ASTPtr database;
ASTPtr table;
ASTPtr format;
ASTPtr query_ptr;
ws.ignore(pos, end);
@ -53,6 +50,7 @@ bool ParserTablePropertiesQuery::parseImpl(Pos & pos, Pos end, ASTPtr & node, Po
return false;
}
ASTQueryWithTableAndOutput * query = dynamic_cast<ASTQueryWithTableAndOutput *>(&*query_ptr);
ws.ignore(pos, end);
@ -76,20 +74,9 @@ bool ParserTablePropertiesQuery::parseImpl(Pos & pos, Pos end, ASTPtr & node, Po
ws.ignore(pos, end);
if (s_format.ignore(pos, end, max_parsed_pos, expected))
{
ws.ignore(pos, end);
ParserIdentifier format_p;
if (!format_p.parse(pos, end, format, max_parsed_pos, expected))
return false;
typeid_cast<ASTIdentifier &>(*format).kind = ASTIdentifier::Format;
ws.ignore(pos, end);
}
ASTQueryWithTableAndOutput * query = dynamic_cast<ASTQueryWithTableAndOutput *>(&*query_ptr);
/// FORMAT format_name
if (!parseFormat(*query, pos, end, node, max_parsed_pos, expected))
return false;
query->range = StringRange(begin, pos);
@ -97,11 +84,8 @@ bool ParserTablePropertiesQuery::parseImpl(Pos & pos, Pos end, ASTPtr & node, Po
query->database = typeid_cast<ASTIdentifier &>(*database).name;
if (table)
query->table = typeid_cast<ASTIdentifier &>(*table).name;
if (format)
{
query->format = format;
query->children.push_back(format);
}
if (query->format)
query->children.push_back(query->format);
node = query_ptr;

View File

@ -8,12 +8,12 @@
#include <DB/Interpreters/ExpressionAnalyzer.h>
#include <DB/Parsers/ASTIdentifier.h>
namespace DB
{
void AlterCommand::apply(NamesAndTypesList & columns,
NamesAndTypesList & materialized_columns,
NamesAndTypesList & alias_columns,
ColumnDefaults & column_defaults) const
void AlterCommand::apply(
NamesAndTypesList & columns, NamesAndTypesList & materialized_columns, NamesAndTypesList & alias_columns,
ColumnDefaults & column_defaults) const
{
if (type == ADD)
{
@ -27,7 +27,7 @@ namespace DB
exists_in(alias_columns))
{
throw Exception{
"Cannot add column " + column_name + ": column with this name already exisits.",
"Cannot add column " + column_name + ": column with this name already exists",
DB::ErrorCodes::ILLEGAL_COLUMN
};
}
@ -99,40 +99,37 @@ namespace DB
}
else if (type == MODIFY)
{
const auto it = column_defaults.find(column_name);
const auto had_default_expr = it != column_defaults.end();
const auto old_default_type = had_default_expr ? it->second.type : ColumnDefaultType{};
const auto default_it = column_defaults.find(column_name);
const auto had_default_expr = default_it != std::end(column_defaults);
const auto old_default_type = had_default_expr ? default_it->second.type : ColumnDefaultType{};
/// allow conversion between DEFAULT and MATERIALIZED
const auto default_materialized_conversion =
(old_default_type == ColumnDefaultType::Default && default_type == ColumnDefaultType::Materialized) ||
(old_default_type == ColumnDefaultType::Materialized && default_type == ColumnDefaultType::Default);
if (old_default_type != default_type && !default_materialized_conversion)
throw Exception{"Cannot change column default specifier from " + toString(old_default_type) +
" to " + toString(default_type), ErrorCodes::INCORRECT_QUERY};
/// target column list
auto & new_columns = default_type == ColumnDefaultType::Default ?
columns : default_type == ColumnDefaultType::Materialized ?
materialized_columns : alias_columns;
/// find column or throw exception
const auto find_column = [this] (NamesAndTypesList & columns) {
const auto it = std::find_if(columns.begin(), columns.end(),
std::bind(namesEqual, std::cref(column_name), std::placeholders::_1) );
if (it == columns.end())
throw Exception("Wrong column name. Cannot find column " + column_name + " to modify.",
throw Exception("Wrong column name. Cannot find column " + column_name + " to modify",
DB::ErrorCodes::ILLEGAL_COLUMN);
return it;
};
/// remove from the old list, add to the new list in case of DEFAULT <-> MATERIALIZED alteration
if (default_materialized_conversion)
/// if default types differ, remove column from the old list, then add to the new list
if (default_type != old_default_type)
{
const auto was_default = old_default_type == ColumnDefaultType::Default;
auto & old_columns = was_default ? columns : materialized_columns;
auto & new_columns = was_default ? materialized_columns : columns;
/// source column list
auto & old_columns = old_default_type == ColumnDefaultType::Default ?
columns : old_default_type == ColumnDefaultType::Materialized ?
materialized_columns : alias_columns;
const auto column_it = find_column(old_columns);
new_columns.emplace_back(*column_it);
old_columns.erase(column_it);
const auto old_column_it = find_column(old_columns);
new_columns.emplace_back(*old_column_it);
old_columns.erase(old_column_it);
/// do not forget to change the default type of old column
if (had_default_expr)
@ -140,19 +137,17 @@ namespace DB
}
/// find column in one of three column lists
const auto column_it = find_column(
default_type == ColumnDefaultType::Default ? columns :
default_type == ColumnDefaultType::Materialized ? materialized_columns :
alias_columns);
const auto column_it = find_column(new_columns);
column_it->type = data_type;
/// remove, add or update default_expression
if (!default_expression && had_default_expr)
/// new column has no default expression, remove it from column_defaults along with it's type
column_defaults.erase(column_name);
else if (default_expression && !had_default_expr)
/// new column has a default expression while the old one had not, add it it column_defaults
column_defaults.emplace(column_name, ColumnDefault{default_type, default_expression});
else if (had_default_expr)
/// both old and new columns have default expression, update it
column_defaults[column_name].expression = default_expression;
}
else
@ -184,7 +179,7 @@ namespace DB
columns.insert(std::end(columns), std::begin(table->alias_columns), std::end(table->alias_columns));
auto defaults = table->column_defaults;
std::vector<std::pair<String, AlterCommand *>> defaulted_columns{};
std::vector<std::pair<NameAndTypePair, AlterCommand *>> defaulted_columns{};
ASTPtr default_expr_list{new ASTExpressionList};
default_expr_list->children.reserve(defaults.size());
@ -193,32 +188,44 @@ namespace DB
{
if (command.type == AlterCommand::ADD || command.type == AlterCommand::MODIFY)
{
if (command.type == AlterCommand::MODIFY)
const auto & column_name = command.column_name;
const auto column_it = std::find_if(std::begin(columns), std::end(columns),
std::bind(AlterCommand::namesEqual, std::cref(command.column_name), std::placeholders::_1));
if (command.type == AlterCommand::ADD)
{
if (std::end(columns) != column_it)
throw Exception{
"Cannot add column " + column_name + ": column with this name already exisits",
DB::ErrorCodes::ILLEGAL_COLUMN
};
}
else if (command.type == AlterCommand::MODIFY)
{
const auto it = std::find_if(std::begin(columns), std::end(columns),
std::bind(AlterCommand::namesEqual, std::cref(command.column_name), std::placeholders::_1));
if (it == std::end(columns))
throw Exception("Wrong column name. Cannot find column " + command.column_name + " to modify.",
DB::ErrorCodes::ILLEGAL_COLUMN);
if (std::end(columns) == column_it)
throw Exception{
"Wrong column name. Cannot find column " + column_name + " to modify",
DB::ErrorCodes::ILLEGAL_COLUMN
};
columns.erase(it);
defaults.erase(command.column_name);
columns.erase(column_it);
defaults.erase(column_name);
}
/// we're creating dummy DataTypeUInt8 in order to prevent the NullPointerException in ExpressionActions
columns.emplace_back(command.column_name, command.data_type ? command.data_type : new DataTypeUInt8);
columns.emplace_back(column_name, command.data_type ? command.data_type : new DataTypeUInt8);
if (command.default_expression)
{
if (command.data_type)
{
const auto & final_column_name = command.column_name;
const auto & final_column_name = column_name;
const auto tmp_column_name = final_column_name + "_tmp";
const auto data_type_ptr = command.data_type.get();
const auto column_type_raw_ptr = command.data_type.get();
/// specific code for different data types, e.g. toFixedString(col, N) for DataTypeFixedString
if (const auto fixed_string = typeid_cast<const DataTypeFixedString *>(data_type_ptr))
if (const auto fixed_string = typeid_cast<const DataTypeFixedString *>(column_type_raw_ptr))
{
const auto conversion_function_name = "toFixedString";
@ -229,10 +236,15 @@ namespace DB
ASTPtr{new ASTLiteral{{}, fixed_string->getN()}}),
final_column_name));
}
else if (typeid_cast<const DataTypeArray *>(column_type_raw_ptr))
{
/// do not perform conversion on arrays, require exact type match
default_expr_list->children.emplace_back(setAlias(
command.default_expression->clone(), final_column_name));
}
else
{
/// @todo fix for parametric types, results in broken codem, i.e. toArray(ElementType)(col)
const auto conversion_function_name = "to" + data_type_ptr->getName();
const auto conversion_function_name = "to" + column_type_raw_ptr->getName();
default_expr_list->children.emplace_back(setAlias(
makeASTFunction(conversion_function_name, ASTPtr{new ASTIdentifier{{}, tmp_column_name}}),
@ -241,14 +253,15 @@ namespace DB
default_expr_list->children.emplace_back(setAlias(command.default_expression->clone(), tmp_column_name));
defaulted_columns.emplace_back(command.column_name, &command);
defaulted_columns.emplace_back(NameAndTypePair{column_name, command.data_type}, &command);
}
else
{
/// no type explicitly specified, will deduce later
default_expr_list->children.emplace_back(
setAlias(command.default_expression->clone(), command.column_name));
setAlias(command.default_expression->clone(), column_name));
defaulted_columns.emplace_back(command.column_name, &command);
defaulted_columns.emplace_back(NameAndTypePair{column_name, nullptr}, &command);
}
}
}
@ -271,7 +284,7 @@ namespace DB
++it;
if (!found)
throw Exception("Wrong column name. Cannot find column " + command.column_name + " to drop.",
throw Exception("Wrong column name. Cannot find column " + command.column_name + " to drop",
DB::ErrorCodes::ILLEGAL_COLUMN);
}
}
@ -285,15 +298,38 @@ namespace DB
return AlterCommand::namesEqual(column_name, name_type);
});
const auto tmp_column_name = column_name + "_tmp";
const auto conversion_function_name = "to" + column_it->type->getName();
const auto & column_type_ptr = column_it->type;
const auto column_type_raw_ptr = column_type_ptr.get();
default_expr_list->children.emplace_back(setAlias(
makeASTFunction(conversion_function_name, ASTPtr{new ASTIdentifier{{}, tmp_column_name}}),
column_name));
/// specific code for different data types, e.g. toFixedString(col, N) for DataTypeFixedString
if (const auto fixed_string = typeid_cast<const DataTypeFixedString *>(column_type_raw_ptr))
{
default_expr_list->children.emplace_back(setAlias(
makeASTFunction("toFixedString",
ASTPtr{new ASTIdentifier{{}, tmp_column_name}},
ASTPtr{new ASTLiteral{{}, fixed_string->getN()}}),
column_name));
}
else if (typeid_cast<const DataTypeArray *>(column_type_raw_ptr))
{
/// do not perform conversion on arrays, require exact type match
default_expr_list->children.emplace_back(setAlias(
col_def.second.expression->clone(),
column_name));
}
else
{
const auto conversion_function_name = "to" + column_it->type->getName();
default_expr_list->children.emplace_back(setAlias(
makeASTFunction(conversion_function_name,
ASTPtr{new ASTIdentifier{{}, tmp_column_name}}),
column_name));
}
default_expr_list->children.emplace_back(setAlias(col_def.second.expression->clone(), tmp_column_name));
defaulted_columns.emplace_back(column_name, nullptr);
defaulted_columns.emplace_back(NameAndTypePair{column_name, column_type_ptr}, nullptr);
}
const auto actions = ExpressionAnalyzer{default_expr_list, context, {}, columns}.getActions(true);
@ -302,41 +338,62 @@ namespace DB
/// set deduced types, modify default expression if necessary
for (auto & defaulted_column : defaulted_columns)
{
const auto & column_name = defaulted_column.first;
const auto command_ptr = defaulted_column.second;
const auto & column = block.getByName(column_name);
const auto & name_and_type = defaulted_column.first;
AlterCommand * & command_ptr = defaulted_column.second;
const auto & column_name = name_and_type.name;
const auto has_explicit_type = nullptr != name_and_type.type;
/// default expression on old column
if (!command_ptr)
if (has_explicit_type)
{
const auto & explicit_type = name_and_type.type;
const auto & tmp_column = block.getByName(column_name + "_tmp");
const auto & deduced_type = tmp_column.type;
// column not specified explicitly in the ALTER query may require default_expression modification
if (column.type->getName() != tmp_column.type->getName())
if (explicit_type->getName() != deduced_type->getName())
{
const auto it = defaults.find(column_name);
this->push_back(AlterCommand{
AlterCommand::MODIFY, column_name, column.type, it->second.type,
makeASTFunction("to" + column.type->getName(), it->second.expression),
});
}
}
else if (command_ptr && command_ptr->data_type)
{
const auto & tmp_column = block.getByName(column_name + "_tmp");
const auto default_it = defaults.find(column_name);
/// type mismatch between explicitly specified and deduced type, add conversion
if (column.type->getName() != tmp_column.type->getName())
{
command_ptr->default_expression = makeASTFunction(
"to" + column.type->getName(),
command_ptr->default_expression->clone());
/// column has no associated alter command, let's create it
if (!command_ptr)
{
/// add a new alter command to modify existing column
this->emplace_back(AlterCommand{
AlterCommand::MODIFY, column_name, explicit_type,
default_it->second.type, default_it->second.expression
});
command_ptr = &this->back();
}
if (const auto fixed_string = typeid_cast<const DataTypeFixedString *>(explicit_type.get()))
{
command_ptr->default_expression = makeASTFunction("toFixedString",
command_ptr->default_expression->clone(),
ASTPtr{new ASTLiteral{{}, fixed_string->getN()}});
}
else if (typeid_cast<const DataTypeArray *>(explicit_type.get()))
{
/// foolproof against defaulting array columns incorrectly
throw Exception{
"Default expression type mismatch for column " + column_name + ". Expected " +
explicit_type->getName() + ", deduced " + deduced_type->getName(),
ErrorCodes::TYPE_MISMATCH
};
}
else
{
command_ptr->default_expression = makeASTFunction("to" + explicit_type->getName(),
command_ptr->default_expression->clone());
}
}
}
else
{
/// just set deduced type
command_ptr->data_type = column.type;
command_ptr->data_type = block.getByName(column_name).type;
}
}
}

View File

@ -557,6 +557,8 @@ MergeTreeData::AlterDataPartTransactionPtr MergeTreeData::alterDataPart(
if (expression)
{
MarkRanges ranges(1, MarkRange(0, part->size));
/** @todo expression->getRequiedColumns may contain integer width columns for FixedString(N) type which after
* passing them to ITableDeclaration::check will trigger and exception about unknown column `N` */
BlockInputStreamPtr part_in = new MergeTreeBlockInputStream(full_path + part->name + '/',
DEFAULT_MERGE_BLOCK_SIZE, expression->getRequiredColumns(), *this, part, ranges, false, nullptr, "", false, 0, DBMS_DEFAULT_BUFFER_SIZE);
ExpressionBlockInputStream in(part_in, expression);

View File

@ -80,7 +80,7 @@ StorageDistributed::StorageDistributed(
context(context_), cluster(cluster_),
sharding_key_expr(sharding_key_ ? ExpressionAnalyzer(sharding_key_, context, nullptr, *columns).getActions(false) : nullptr),
sharding_key_column_name(sharding_key_ ? sharding_key_->getColumnName() : String{}),
write_enabled(!data_path_.empty() && (cluster.getLocalNodesNum() + cluster.pools.size() < 2 || sharding_key_)),
write_enabled(!data_path_.empty() && (((cluster.getLocalShardCount() + cluster.getRemoteShardCount()) < 2) || sharding_key_)),
path(data_path_.empty() ? "" : (data_path_ + escapeForFileName(name) + '/'))
{
createDirectoryMonitors();
@ -104,7 +104,7 @@ StorageDistributed::StorageDistributed(
context(context_), cluster(cluster_),
sharding_key_expr(sharding_key_ ? ExpressionAnalyzer(sharding_key_, context, nullptr, *columns).getActions(false) : nullptr),
sharding_key_column_name(sharding_key_ ? sharding_key_->getColumnName() : String{}),
write_enabled(!data_path_.empty() && (cluster.getLocalNodesNum() + cluster.pools.size() < 2 || sharding_key_)),
write_enabled(!data_path_.empty() && (((cluster.getLocalShardCount() + cluster.getRemoteShardCount()) < 2) || sharding_key_)),
path(data_path_.empty() ? "" : (data_path_ + escapeForFileName(name) + '/'))
{
createDirectoryMonitors();
@ -168,7 +168,7 @@ BlockInputStreams StorageDistributed::read(
/// Не имеет смысла на удалённых серверах, так как запрос отправляется обычно с другим user-ом.
new_settings.max_concurrent_queries_for_user = 0;
size_t result_size = (cluster.pools.size() * settings.max_parallel_replicas) + cluster.getLocalNodesNum();
size_t result_size = (cluster.getRemoteShardCount() * settings.max_parallel_replicas) + cluster.getLocalShardCount();
processed_stage = result_size == 1 || settings.distributed_group_by_no_merge
? QueryProcessingStage::Complete
@ -193,26 +193,31 @@ BlockInputStreams StorageDistributed::read(
external_tables = context.getExternalTables();
/// Цикл по шардам.
for (auto & conn_pool : cluster.pools)
res.emplace_back(new RemoteBlockInputStream{
conn_pool, modified_query, &new_settings, throttler,
external_tables, processed_stage, context});
/// Добавляем запросы к локальному ClickHouse.
if (cluster.getLocalNodesNum() > 0)
for (const auto & shard_info : cluster.getShardsInfo())
{
DB::Context new_context = context;
new_context.setSettings(new_settings);
for (size_t i = 0; i < cluster.getLocalNodesNum(); ++i)
if (shard_info.isLocal())
{
InterpreterSelectQuery interpreter(modified_query_ast, new_context, processed_stage);
/// Добавляем запросы к локальному ClickHouse.
/** Материализация нужна, так как с удалённых серверов константы приходят материализованными.
* Если этого не делать, то в разных потоках будут получаться разные типы (Const и не-Const) столбцов,
* а это не разрешено, так как весь код исходит из допущения, что в потоке блоков все типы одинаковые.
*/
res.emplace_back(new MaterializingBlockInputStream(interpreter.execute().in));
DB::Context new_context = context;
new_context.setSettings(new_settings);
for (const auto & address : shard_info.local_addresses)
{
InterpreterSelectQuery interpreter(modified_query_ast, new_context, processed_stage);
/** Материализация нужна, так как с удалённых серверов константы приходят материализованными.
* Если этого не делать, то в разных потоках будут получаться разные типы (Const и не-Const) столбцов,
* а это не разрешено, так как весь код исходит из допущения, что в потоке блоков все типы одинаковые.
*/
res.emplace_back(new MaterializingBlockInputStream(interpreter.execute().in));
}
}
else
{
res.emplace_back(new RemoteBlockInputStream{
shard_info.pool, modified_query, &new_settings, throttler,
external_tables, processed_stage, context});
}
}
@ -274,26 +279,29 @@ BlockInputStreams StorageDistributed::describe(const Context & context, const Se
BlockInputStreams res;
/// Цикл по шардам.
for (auto & conn_pool : cluster.pools)
for (const auto & shard_info : cluster.getShardsInfo())
{
auto stream = new RemoteBlockInputStream{conn_pool, query, &new_settings, throttler};
stream->reachAllReplicas();
stream->appendExtraInfo();
res.emplace_back(stream);
}
/// Добавляем запросы к локальному ClickHouse.
if (cluster.getLocalNodesNum() > 0)
{
DB::Context new_context = context;
new_context.setSettings(new_settings);
const auto & local_addresses = cluster.getLocalShardsInfo();
for (const auto & address : local_addresses)
if (shard_info.isLocal())
{
InterpreterDescribeQuery interpreter(ast, new_context);
BlockInputStreamPtr stream = new MaterializingBlockInputStream(interpreter.execute().in);
stream = new BlockExtraInfoInputStream(stream, toBlockExtraInfo(address));
/// Добавляем запросы к локальному ClickHouse.
DB::Context new_context = context;
new_context.setSettings(new_settings);
for (const auto & address : shard_info.local_addresses)
{
InterpreterDescribeQuery interpreter(ast, new_context);
BlockInputStreamPtr stream = new MaterializingBlockInputStream(interpreter.execute().in);
stream = new BlockExtraInfoInputStream(stream, toBlockExtraInfo(address));
res.emplace_back(stream);
}
}
if (shard_info.hasRemoteConnections())
{
auto stream = new RemoteBlockInputStream{shard_info.pool, query, &new_settings, throttler};
stream->reachAllReplicas();
stream->appendExtraInfo();
res.emplace_back(stream);
}
}
@ -340,7 +348,7 @@ void StorageDistributed::requireDirectoryMonitor(const std::string & name)
size_t StorageDistributed::getShardCount() const
{
return cluster.pools.size();
return cluster.getRemoteShardCount();
}
}

View File

@ -72,9 +72,9 @@ BlockInputStreams StorageSystemClusters::read(
{
const std::string cluster_name = entry.first;
const Cluster & cluster = entry.second;
const auto & addresses = cluster.getShardsInfo();
const auto & addresses_with_failover = cluster.getShardsWithFailoverInfo();
const auto & shards_info = cluster.shard_info_vec;
const auto & addresses = cluster.getShardsAddresses();
const auto & addresses_with_failover = cluster.getShardsWithFailoverAddresses();
const auto & shards_info = cluster.getShardsInfo();
if (!addresses.empty())
{

View File

@ -1 +1,2 @@
SELECT min(ts = toUInt32(toDateTime(toString(ts)))) FROM (SELECT 1000000000 + 1234 * number AS ts FROM system.numbers LIMIT 1000000);
SELECT min(ts = toUInt32(toDateTime(toString(ts)))) FROM (SELECT 10000 + 1234 * number AS ts FROM system.numbers LIMIT 1000000);

View File

@ -23,21 +23,186 @@
1970-01-02 09:00:00 1970-01-02 10:00:00
1970-01-02 18:00:00 1970-01-02 18:00:00
1970-01-02 01:30:00 1970-01-02 01:30:00
2014
2014
2014
2014
2014
9
9
9
10
9
30
30
30
1
30
2
2
2
3
2
23
21
20
4
11
50
50
50
50
50
0
0
0
0
0
2014-09-30 23:50:00
2014-09-30 23:50:00
2014-09-30 23:50:00
2014-09-30 23:50:00
2014-09-30 23:50:00
2014-09-30 23:00:00
2014-09-30 23:00:00
2014-09-30 23:00:00
2014-09-30 23:00:00
2014-09-30 23:00:00
2014
2014
2014
2014
2014
24177
24177
24177
24178
24177
2334
2334
2334
2334
2334
16343
16343
16343
16344
16343
392251
392251
392251
392251
392251
23535110
23535110
23535110
23535110
23535110
1412106600
1412106600
1412106600
1412106600
1412106600
2014-09-30
2014-09-30
2014-09-30
2014-10-01
2014-09-30
2014-09-30
2014-09-30
2014-09-30
2014-10-01
2014-09-30
2015-07-13 2015-07-01 2015-07-01 1970-01-02 19:30:00
2014-12-22 2014-12-01 2014-10-01 1970-01-02 21:00:00
2014-12-29 2015-01-01 2015-01-01 1970-01-02 12:00:00
2014-09-29 2014-09-01 2014-07-01 1970-01-02 21:50:00
2015-03-09 2015-03-01 2015-01-01 1970-01-02 02:00:00
2015 7 15 3
2014 12 28 7
2015 1 1 4
2014 9 30 2
2015 3 15 7
19 30 0 2015-07-15 13:30:00
21 0 0 2014-12-29 00:00:00
12 0 0 2015-01-01 12:00:00
21 50 0 2014-09-30 23:50:00
2 30 0 2015-03-15 13:30:00
2015-07-15 13:00:00 2015 24187 2375
2014-12-29 00:00:00 2014 24180 2346
2015-01-01 12:00:00 2015 24181 2347
2014-09-30 23:00:00 2014 24177 2334
2015-03-15 13:00:00 2015 24183 2357
16631 399154 23949270 1436956200
16432 394389 23663340 1419800400
16436 394473 23668380 1420102800
16343 392251 23535110 1412106600
16509 396226 23773590 1426415400
2015-07-15 2015-07-15
2014-12-28 2014-12-28
2015-01-01 2015-01-01
2014-09-30 2014-09-30
2015-03-15 2015-03-15
2014-09-29 2014-09-01 2014-10-01 1970-01-02 11:20:00
2014-12-22 2014-12-01 2014-10-01 1970-01-02 13:30:00
2014-12-29 2015-01-01 2015-01-01 1970-01-02 01:30:00
2015-03-09 2015-03-01 2015-01-01 1970-01-02 02:00:00
2015-07-13 2015-07-01 2015-07-01 1970-01-02 02:00:00
2015 7 15 3
2014 12 29 7
2015 1 1 4
2014 9 1 2
2015 3 15 7
12 30 0 2015-07-15 13:30:00
22 0 0 2014-12-29 00:00:00
10 0 0 2015-01-01 12:00:00
21 50 0 2014-09-30 23:50:00
11 30 0 2015-03-15 13:30:00
2015-07-15 13:00:00 2015 24187 2375
2014-12-29 00:00:00 2014 24180 2346
2015-01-01 12:00:00 2015 24181 2347
2014-09-30 23:00:00 2014 24178 2334
2015-03-15 13:00:00 2015 24183 2357
16631 399154 23949270 1436956200
16432 394389 23663340 1419800400
16436 394473 23668380 1420102800
16343 392251 23535110 1412106600
16509 396226 23773590 1426415400
2015-07-15 2015-07-15
2014-12-28 2014-12-28
2015-01-01 2015-01-01
2014-09-30 2014-09-30
2015-03-15 2015-03-15
2015-03-09 2015-03-01 2015-01-01 1970-01-02 19:30:00
2015-03-09 2015-03-01 2015-01-01 1970-01-02 10:30:00
2015-03-09 2015-03-01 2015-01-01 1970-01-02 13:30:00
2015-03-09 2015-03-01 2015-01-01 1970-01-02 11:30:00
2015-03-09 2015-03-01 2015-01-01 1970-01-02 02:00:00
2015 3 15 7
2015 3 15 7
2015 3 15 7
2015 3 15 7
2015 3 15 7
19 30 0 2015-03-15 13:30:00
10 30 0 2015-03-15 13:30:00
13 30 0 2015-03-15 13:30:00
11 30 0 2015-03-15 13:30:00
2 30 0 2015-03-15 13:30:00
2015-03-15 13:00:00 2015 24183 2357
2015-03-15 13:00:00 2015 24183 2357
2015-03-15 13:00:00 2015 24183 2357
2015-03-15 13:00:00 2015 24183 2357
2015-03-15 13:00:00 2015 24183 2357
16509 396226 23773590 1426415400
16509 396226 23773590 1426415400
16509 396226 23773590 1426415400
16509 396226 23773590 1426415400
16509 396226 23773590 1426415400
2015-03-15 2015-03-15
2015-03-15 2015-03-15
2015-03-15 2015-03-15
2015-03-15 2015-03-15
2015-03-15 2015-03-15
2015-07-15 13:30:00
2015-07-15 12:30:00
2015-07-15 11:30:00

View File

@ -46,6 +46,148 @@ SELECT toTime(toDateTime(1420102800), 'Europe/London'), toTime(toDateTime(142831
SELECT toTime(toDateTime(1420102800), 'Asia/Tokyo'), toTime(toDateTime(1428310800), 'Asia/Tokyo');
SELECT toTime(toDateTime(1420102800), 'Pacific/Pitcairn'), toTime(toDateTime(1428310800), 'Pacific/Pitcairn');
/* toYear */
SELECT toYear(toDateTime(1412106600), 'Europe/Moscow');
SELECT toYear(toDateTime(1412106600), 'Europe/Paris');
SELECT toYear(toDateTime(1412106600), 'Europe/London');
SELECT toYear(toDateTime(1412106600), 'Asia/Tokyo');
SELECT toYear(toDateTime(1412106600), 'Pacific/Pitcairn');
/* toMonth */
SELECT toMonth(toDateTime(1412106600), 'Europe/Moscow');
SELECT toMonth(toDateTime(1412106600), 'Europe/Paris');
SELECT toMonth(toDateTime(1412106600), 'Europe/London');
SELECT toMonth(toDateTime(1412106600), 'Asia/Tokyo');
SELECT toMonth(toDateTime(1412106600), 'Pacific/Pitcairn');
/* toDayOfMonth */
SELECT toDayOfMonth(toDateTime(1412106600), 'Europe/Moscow');
SELECT toDayOfMonth(toDateTime(1412106600), 'Europe/Paris');
SELECT toDayOfMonth(toDateTime(1412106600), 'Europe/London');
SELECT toDayOfMonth(toDateTime(1412106600), 'Asia/Tokyo');
SELECT toDayOfMonth(toDateTime(1412106600), 'Pacific/Pitcairn');
/* toDayOfWeek */
SELECT toDayOfWeek(toDateTime(1412106600), 'Europe/Moscow');
SELECT toDayOfWeek(toDateTime(1412106600), 'Europe/Paris');
SELECT toDayOfWeek(toDateTime(1412106600), 'Europe/London');
SELECT toDayOfWeek(toDateTime(1412106600), 'Asia/Tokyo');
SELECT toDayOfWeek(toDateTime(1412106600), 'Pacific/Pitcairn');
/* toHour */
SELECT toHour(toDateTime(1412106600), 'Europe/Moscow');
SELECT toHour(toDateTime(1412106600), 'Europe/Paris');
SELECT toHour(toDateTime(1412106600), 'Europe/London');
SELECT toHour(toDateTime(1412106600), 'Asia/Tokyo');
SELECT toHour(toDateTime(1412106600), 'Pacific/Pitcairn');
/* toMinute */
SELECT toMinute(toDateTime(1412106600), 'Europe/Moscow');
SELECT toMinute(toDateTime(1412106600), 'Europe/Paris');
SELECT toMinute(toDateTime(1412106600), 'Europe/London');
SELECT toMinute(toDateTime(1412106600), 'Asia/Tokyo');
SELECT toMinute(toDateTime(1412106600), 'Pacific/Pitcairn');
/* toSecond */
SELECT toSecond(toDateTime(1412106600), 'Europe/Moscow');
SELECT toSecond(toDateTime(1412106600), 'Europe/Paris');
SELECT toSecond(toDateTime(1412106600), 'Europe/London');
SELECT toSecond(toDateTime(1412106600), 'Asia/Tokyo');
SELECT toSecond(toDateTime(1412106600), 'Pacific/Pitcairn');
/* toStartOfMinute */
SELECT toStartOfMinute(toDateTime(1412106600), 'Europe/Moscow');
SELECT toStartOfMinute(toDateTime(1412106600), 'Europe/Paris');
SELECT toStartOfMinute(toDateTime(1412106600), 'Europe/London');
SELECT toStartOfMinute(toDateTime(1412106600), 'Asia/Tokyo');
SELECT toStartOfMinute(toDateTime(1412106600), 'Pacific/Pitcairn');
/* toStartOfHour */
SELECT toStartOfHour(toDateTime(1412106600), 'Europe/Moscow');
SELECT toStartOfHour(toDateTime(1412106600), 'Europe/Paris');
SELECT toStartOfHour(toDateTime(1412106600), 'Europe/London');
SELECT toStartOfHour(toDateTime(1412106600), 'Asia/Tokyo');
SELECT toStartOfHour(toDateTime(1412106600), 'Pacific/Pitcairn');
/* toRelativeYearNum */
SELECT toRelativeYearNum(toDateTime(1412106600), 'Europe/Moscow');
SELECT toRelativeYearNum(toDateTime(1412106600), 'Europe/Paris');
SELECT toRelativeYearNum(toDateTime(1412106600), 'Europe/London');
SELECT toRelativeYearNum(toDateTime(1412106600), 'Asia/Tokyo');
SELECT toRelativeYearNum(toDateTime(1412106600), 'Pacific/Pitcairn');
/* toRelativeMonthNum */
SELECT toRelativeMonthNum(toDateTime(1412106600), 'Europe/Moscow');
SELECT toRelativeMonthNum(toDateTime(1412106600), 'Europe/Paris');
SELECT toRelativeMonthNum(toDateTime(1412106600), 'Europe/London');
SELECT toRelativeMonthNum(toDateTime(1412106600), 'Asia/Tokyo');
SELECT toRelativeMonthNum(toDateTime(1412106600), 'Pacific/Pitcairn');
/* toRelativeWeekNum */
SELECT toRelativeWeekNum(toDateTime(1412106600), 'Europe/Moscow');
SELECT toRelativeWeekNum(toDateTime(1412106600), 'Europe/Paris');
SELECT toRelativeWeekNum(toDateTime(1412106600), 'Europe/London');
SELECT toRelativeWeekNum(toDateTime(1412106600), 'Asia/Tokyo');
SELECT toRelativeWeekNum(toDateTime(1412106600), 'Pacific/Pitcairn');
/* toRelativeDayNum */
SELECT toRelativeDayNum(toDateTime(1412106600), 'Europe/Moscow');
SELECT toRelativeDayNum(toDateTime(1412106600), 'Europe/Paris');
SELECT toRelativeDayNum(toDateTime(1412106600), 'Europe/London');
SELECT toRelativeDayNum(toDateTime(1412106600), 'Asia/Tokyo');
SELECT toRelativeDayNum(toDateTime(1412106600), 'Pacific/Pitcairn');
/* toRelativeHourNum */
SELECT toRelativeHourNum(toDateTime(1412106600), 'Europe/Moscow');
SELECT toRelativeHourNum(toDateTime(1412106600), 'Europe/Paris');
SELECT toRelativeHourNum(toDateTime(1412106600), 'Europe/London');
SELECT toRelativeHourNum(toDateTime(1412106600), 'Asia/Tokyo');
SELECT toRelativeHourNum(toDateTime(1412106600), 'Pacific/Pitcairn');
/* toRelativeMinuteNum */
SELECT toRelativeMinuteNum(toDateTime(1412106600), 'Europe/Moscow');
SELECT toRelativeMinuteNum(toDateTime(1412106600), 'Europe/Paris');
SELECT toRelativeMinuteNum(toDateTime(1412106600), 'Europe/London');
SELECT toRelativeMinuteNum(toDateTime(1412106600), 'Asia/Tokyo');
SELECT toRelativeMinuteNum(toDateTime(1412106600), 'Pacific/Pitcairn');
/* toRelativeSecondNum */
SELECT toRelativeSecondNum(toDateTime(1412106600), 'Europe/Moscow');
SELECT toRelativeSecondNum(toDateTime(1412106600), 'Europe/Paris');
SELECT toRelativeSecondNum(toDateTime(1412106600), 'Europe/London');
SELECT toRelativeSecondNum(toDateTime(1412106600), 'Asia/Tokyo');
SELECT toRelativeSecondNum(toDateTime(1412106600), 'Pacific/Pitcairn');
/* toDate */
SELECT toDate(toDateTime(1412106600), 'Europe/Moscow');
SELECT toDate(toDateTime(1412106600), 'Europe/Paris');
SELECT toDate(toDateTime(1412106600), 'Europe/London');
SELECT toDate(toDateTime(1412106600), 'Asia/Tokyo');
SELECT toDate(toDateTime(1412106600), 'Pacific/Pitcairn');
SELECT toDate(1412106600, 'Europe/Moscow');
SELECT toDate(1412106600, 'Europe/Paris');
SELECT toDate(1412106600, 'Europe/London');
SELECT toDate(1412106600, 'Asia/Tokyo');
SELECT toDate(1412106600, 'Pacific/Pitcairn');
DROP TABLE IF EXISTS foo;
CREATE TABLE foo(x Int32, y String) ENGINE=Memory;
INSERT INTO foo(x, y) VALUES(1420102800, 'Europe/Moscow');
@ -55,8 +197,25 @@ INSERT INTO foo(x, y) VALUES(1436956200, 'Asia/Tokyo');
INSERT INTO foo(x, y) VALUES(1426415400, 'Pacific/Pitcairn');
SELECT toMonday(toDateTime(x), y), toStartOfMonth(toDateTime(x), y), toStartOfQuarter(toDateTime(x), y), toTime(toDateTime(x), y) FROM foo ORDER BY y ASC;
SELECT toYear(toDateTime(x), y), toMonth(toDateTime(x), y), toDayOfMonth(toDateTime(x), y), toDayOfWeek(toDateTime(x), y) FROM foo ORDER BY y ASC;
SELECT toHour(toDateTime(x), y), toMinute(toDateTime(x), y), toSecond(toDateTime(x), y), toStartOfMinute(toDateTime(x), y) FROM foo ORDER BY y ASC;
SELECT toStartOfHour(toDateTime(x), y), toRelativeYearNum(toDateTime(x), y), toRelativeMonthNum(toDateTime(x), y), toRelativeWeekNum(toDateTime(x), y) FROM foo ORDER BY y ASC;
SELECT toRelativeDayNum(toDateTime(x), y), toRelativeHourNum(toDateTime(x), y), toRelativeMinuteNum(toDateTime(x), y), toRelativeSecondNum(toDateTime(x), y) FROM foo ORDER BY y ASC;
SELECT toDate(toDateTime(x), y), toDate(x, y) FROM foo ORDER BY y ASC;
SELECT toMonday(toDateTime(x), 'Europe/Paris'), toStartOfMonth(toDateTime(x), 'Europe/London'), toStartOfQuarter(toDateTime(x), 'Asia/Tokyo'), toTime(toDateTime(x), 'Pacific/Pitcairn') FROM foo ORDER BY x ASC;
SELECT toYear(toDateTime(x), 'Europe/Paris'), toMonth(toDateTime(x), 'Europe/London'), toDayOfMonth(toDateTime(x), 'Asia/Tokyo'), toDayOfWeek(toDateTime(x), 'Pacific/Pitcairn') FROM foo ORDER BY y ASC;
SELECT toHour(toDateTime(x), 'Europe/Paris'), toMinute(toDateTime(x), 'Europe/London'), toSecond(toDateTime(x), 'Asia/Tokyo'), toStartOfMinute(toDateTime(x), 'Pacific/Pitcairn') FROM foo ORDER BY y ASC;
SELECT toStartOfHour(toDateTime(x), 'Europe/Paris'), toRelativeYearNum(toDateTime(x), 'Europe/London'), toRelativeMonthNum(toDateTime(x), 'Asia/Tokyo'), toRelativeWeekNum(toDateTime(x), 'Pacific/Pitcairn') FROM foo ORDER BY y ASC;
SELECT toRelativeDayNum(toDateTime(x), 'Europe/Paris'), toRelativeHourNum(toDateTime(x), 'Europe/London'), toRelativeMinuteNum(toDateTime(x), 'Asia/Tokyo'), toRelativeSecondNum(toDateTime(x), 'Pacific/Pitcairn') FROM foo ORDER BY y ASC;
SELECT toDate(toDateTime(x), 'Europe/Paris'), toDate(x, 'Europe/Paris') FROM foo ORDER BY y ASC;
SELECT toMonday(toDateTime(1426415400), y), toStartOfMonth(toDateTime(1426415400), y), toStartOfQuarter(toDateTime(1426415400), y), toTime(toDateTime(1426415400), y) FROM foo ORDER BY y ASC;
SELECT toYear(toDateTime(1426415400), y), toMonth(toDateTime(1426415400), y), toDayOfMonth(toDateTime(1426415400), y), toDayOfWeek(toDateTime(1426415400), y) FROM foo ORDER BY y ASC;
SELECT toHour(toDateTime(1426415400), y), toMinute(toDateTime(1426415400), y), toSecond(toDateTime(1426415400), y), toStartOfMinute(toDateTime(1426415400), y) FROM foo ORDER BY y ASC;
SELECT toStartOfHour(toDateTime(1426415400), y), toRelativeYearNum(toDateTime(1426415400), y), toRelativeMonthNum(toDateTime(1426415400), y), toRelativeWeekNum(toDateTime(1426415400), y) FROM foo ORDER BY y ASC;
SELECT toRelativeDayNum(toDateTime(1426415400), y), toRelativeHourNum(toDateTime(1426415400), y), toRelativeMinuteNum(toDateTime(1426415400), y), toRelativeSecondNum(toDateTime(1426415400), y) FROM foo ORDER BY y ASC;
SELECT toDate(toDateTime(1426415400), y), toDate(1426415400, y) FROM foo ORDER BY y ASC;
/* toString */

View File

@ -2,9 +2,9 @@
1
1
0
1
0
1
1
1

View File

@ -0,0 +1,29 @@
40
41
2 42
43
11
40
40
41
41
2 42
2 42
43
43
11
11
11
11
1
1
2
2

View File

@ -0,0 +1,15 @@
select 40 as z from (select * from system.numbers limit 3) group by z;
select 41 as z from remote('127.0.0.{1,2}', system.one) group by z;
select count(), 42 AS z from remote('127.0.0.{1,2}', system.one) group by z;
select 43 AS z from remote('127.0.0.{1,2}', system.one) group by 42, 43, 44;
select 11 AS z from (SELECT 2 UNION ALL SELECT 3) group by 42, 43, 44;
select 40 as z from (select * from system.numbers limit 3) group by z WITH TOTALS;
select 41 as z from remote('127.0.0.{1,2}', system.one) group by z WITH TOTALS;
select count(), 42 AS z from remote('127.0.0.{1,2}', system.one) group by z WITH TOTALS;
select 43 AS z from remote('127.0.0.{1,2}', system.one) group by 42, 43, 44 WITH TOTALS;
select 11 AS z from (SELECT 1 UNION ALL SELECT 2) group by 42, 43, 44 WITH TOTALS;
select 11 AS z from (SELECT 2 UNION ALL SELECT 3) group by 42, 43, 44 WITH TOTALS;
SELECT count() WITH TOTALS;
SELECT count() FROM remote('127.0.0.{1,2}', system.one) WITH TOTALS;

View File

@ -53,7 +53,6 @@ namespace ext
* with each element transformed by the application of `mapper`. */
template <template <typename...> class Collection, typename... Params, typename Mapper>
auto map(const Collection<Params...> & collection, const Mapper mapper)
-> Collection<unqualified_t<decltype(mapper(*std::begin(collection)))>>
{
using value_type = unqualified_t<decltype(mapper(*std::begin(collection)))>;
@ -66,7 +65,6 @@ namespace ext
* Allows conversion between different container-types, e.g. std::vector to std::list */
template <template <typename...> class ResultCollection, typename Collection, typename Mapper>
auto map(const Collection & collection, const Mapper mapper)
-> ResultCollection<unqualified_t<decltype(mapper(*std::begin(collection)))>>
{
using value_type = unqualified_t<decltype(mapper(*std::begin(collection)))>;
@ -78,7 +76,7 @@ namespace ext
* with each element transformed by the application of `mapper`.
* Allows leveraging implicit conversion between the result of applying `mapper` and R::value_type. */
template <typename ResultCollection, typename Collection, typename Mapper>
ResultCollection map(const Collection & collection, const Mapper mapper)
auto map(const Collection & collection, const Mapper mapper)
{
return ResultCollection(ext::make_map_iterator(std::begin(collection), mapper),
ext::make_map_iterator(std::end(collection), mapper));