This commit is contained in:
Roman Peshkurov 2015-03-17 18:06:42 +03:00
commit 3d9ac4d074
51 changed files with 2032 additions and 298 deletions

View File

@ -147,10 +147,10 @@ struct __attribute__((__packed__)) SingleValueDataString
static constexpr Int32 AUTOMATIC_STORAGE_SIZE = 64; static constexpr Int32 AUTOMATIC_STORAGE_SIZE = 64;
static constexpr Int32 MAX_SMALL_STRING_SIZE = AUTOMATIC_STORAGE_SIZE - sizeof(size); static constexpr Int32 MAX_SMALL_STRING_SIZE = AUTOMATIC_STORAGE_SIZE - sizeof(size);
union union __attribute__((__aligned__(1)))
{ {
char small_data[MAX_SMALL_STRING_SIZE]; /// Включая завершающий ноль. char small_data[MAX_SMALL_STRING_SIZE]; /// Включая завершающий ноль.
char * large_data; char * __attribute__((__aligned__(1))) large_data;
}; };
~SingleValueDataString() ~SingleValueDataString()

View File

@ -55,13 +55,15 @@ public:
Protocol::Compression::Enum compression_ = Protocol::Compression::Enable, Protocol::Compression::Enum compression_ = Protocol::Compression::Enable,
Poco::Timespan connect_timeout_ = Poco::Timespan(DBMS_DEFAULT_CONNECT_TIMEOUT_SEC, 0), Poco::Timespan connect_timeout_ = Poco::Timespan(DBMS_DEFAULT_CONNECT_TIMEOUT_SEC, 0),
Poco::Timespan receive_timeout_ = Poco::Timespan(DBMS_DEFAULT_RECEIVE_TIMEOUT_SEC, 0), Poco::Timespan receive_timeout_ = Poco::Timespan(DBMS_DEFAULT_RECEIVE_TIMEOUT_SEC, 0),
Poco::Timespan send_timeout_ = Poco::Timespan(DBMS_DEFAULT_SEND_TIMEOUT_SEC, 0)) Poco::Timespan send_timeout_ = Poco::Timespan(DBMS_DEFAULT_SEND_TIMEOUT_SEC, 0),
Poco::Timespan ping_timeout_ = Poco::Timespan(DBMS_DEFAULT_PING_TIMEOUT_SEC, 0))
: :
host(host_), port(port_), default_database(default_database_), host(host_), port(port_), default_database(default_database_),
user(user_), password(password_), user(user_), password(password_),
client_name(client_name_), client_name(client_name_),
compression(compression_), data_type_factory(data_type_factory_), compression(compression_), data_type_factory(data_type_factory_),
connect_timeout(connect_timeout_), receive_timeout(receive_timeout_), send_timeout(send_timeout_), connect_timeout(connect_timeout_), receive_timeout(receive_timeout_), send_timeout(send_timeout_),
ping_timeout(ping_timeout_),
log_wrapper(host, port) log_wrapper(host, port)
{ {
/// Соединеняемся не сразу, а при первой необходимости. /// Соединеняемся не сразу, а при первой необходимости.
@ -178,6 +180,7 @@ private:
Poco::Timespan connect_timeout; Poco::Timespan connect_timeout;
Poco::Timespan receive_timeout; Poco::Timespan receive_timeout;
Poco::Timespan send_timeout; Poco::Timespan send_timeout;
Poco::Timespan ping_timeout;
/// Откуда читать результат выполнения запроса. /// Откуда читать результат выполнения запроса.
SharedPtr<ReadBuffer> maybe_compressed_in; SharedPtr<ReadBuffer> maybe_compressed_in;

View File

@ -11,6 +11,7 @@
#define DBMS_DEFAULT_CONNECT_TIMEOUT_WITH_FAILOVER_MS 50 #define DBMS_DEFAULT_CONNECT_TIMEOUT_WITH_FAILOVER_MS 50
#define DBMS_DEFAULT_SEND_TIMEOUT_SEC 300 #define DBMS_DEFAULT_SEND_TIMEOUT_SEC 300
#define DBMS_DEFAULT_RECEIVE_TIMEOUT_SEC 300 #define DBMS_DEFAULT_RECEIVE_TIMEOUT_SEC 300
#define DBMS_DEFAULT_PING_TIMEOUT_SEC 5
#define DBMS_DEFAULT_POLL_INTERVAL 10 #define DBMS_DEFAULT_POLL_INTERVAL 10
/// Насколько секунд можно максимально задерживать вставку в таблицу типа MergeTree, если в ней много недомердженных кусков. /// Насколько секунд можно максимально задерживать вставку в таблицу типа MergeTree, если в ней много недомердженных кусков.
@ -46,7 +47,9 @@
#define DEFAULT_INTERACTIVE_DELAY 100000 #define DEFAULT_INTERACTIVE_DELAY 100000
#define DBMS_DEFAULT_DISTRIBUTED_CONNECTIONS_POOL_SIZE 1024 #define DBMS_DEFAULT_DISTRIBUTED_CONNECTIONS_POOL_SIZE 1024
#define DBMS_CONNECTION_POOL_WITH_FAILOVER_DEFAULT_MAX_TRIES 3 #define DBMS_CONNECTION_POOL_WITH_FAILOVER_DEFAULT_MAX_TRIES 3
#define DBMS_CONNECTION_POOL_WITH_FAILOVER_DEFAULT_DECREASE_ERROR_PERIOD 300 /// каждый период уменьшаем счетчик ошибок в 2 раза /// каждый период уменьшаем счетчик ошибок в 2 раза
/// слишком маленький период может приводить, что ошибки исчезают сразу после создания.
#define DBMS_CONNECTION_POOL_WITH_FAILOVER_DEFAULT_DECREASE_ERROR_PERIOD (2*DBMS_DEFAULT_SEND_TIMEOUT_SEC)
#define DEFAULT_QUERIES_QUEUE_WAIT_TIME_MS 5000 /// Максимальное время ожидания в очереди запросов. #define DEFAULT_QUERIES_QUEUE_WAIT_TIME_MS 5000 /// Максимальное время ожидания в очереди запросов.
#define DBMS_DEFAULT_BACKGROUND_POOL_SIZE 6 #define DBMS_DEFAULT_BACKGROUND_POOL_SIZE 6
@ -68,5 +71,8 @@
#define DBMS_DISTRIBUTED_DIRECTORY_MONITOR_SLEEP_TIME_MS 100 #define DBMS_DISTRIBUTED_DIRECTORY_MONITOR_SLEEP_TIME_MS 100
/// Граница, на которых должны быть выровнены блоки для асинхронных файловых операций.
#define DEFAULT_AIO_FILE_BLOCK_SIZE 512
#define ALWAYS_INLINE __attribute__((__always_inline__)) #define ALWAYS_INLINE __attribute__((__always_inline__))
#define NO_INLINE __attribute__((__noinline__)) #define NO_INLINE __attribute__((__noinline__))

View File

@ -279,6 +279,12 @@ namespace ErrorCodes
INFINITE_LOOP, INFINITE_LOOP,
CANNOT_COMPRESS, CANNOT_COMPRESS,
CANNOT_DECOMPRESS, CANNOT_DECOMPRESS,
AIO_SUBMIT_ERROR,
AIO_COMPLETION_ERROR,
AIO_READ_ERROR,
AIO_WRITE_ERROR,
AIO_UNALIGNED_SIZE_ERROR,
INDEX_NOT_USED,
POCO_EXCEPTION = 1000, POCO_EXCEPTION = 1000,
STD_EXCEPTION, STD_EXCEPTION,

View File

@ -18,16 +18,6 @@ using Poco::SharedPtr;
class AggregatingBlockInputStream : public IProfilingBlockInputStream class AggregatingBlockInputStream : public IProfilingBlockInputStream
{ {
public: public:
AggregatingBlockInputStream(BlockInputStreamPtr input_, const ColumnNumbers & keys_, AggregateDescriptions & aggregates_,
bool overflow_row_, bool final_, size_t max_rows_to_group_by_, OverflowMode group_by_overflow_mode_,
Compiler * compiler_, UInt32 min_count_to_compile_, size_t group_by_two_level_threshold_)
: aggregator(keys_, aggregates_, overflow_row_, max_rows_to_group_by_, group_by_overflow_mode_,
compiler_, min_count_to_compile_, group_by_two_level_threshold_),
final(final_)
{
children.push_back(input_);
}
/** keys берутся из GROUP BY части запроса /** keys берутся из GROUP BY части запроса
* Агрегатные функции ищутся везде в выражении. * Агрегатные функции ищутся везде в выражении.
* Столбцы, соответствующие keys и аргументам агрегатных функций, уже должны быть вычислены. * Столбцы, соответствующие keys и аргументам агрегатных функций, уже должны быть вычислены.

View File

@ -15,10 +15,8 @@ class CollapsingFinalBlockInputStream : public IProfilingBlockInputStream
{ {
public: public:
CollapsingFinalBlockInputStream(BlockInputStreams inputs_, const SortDescription & description_, CollapsingFinalBlockInputStream(BlockInputStreams inputs_, const SortDescription & description_,
const String & sign_column_) const String & sign_column_name_)
: description(description_), sign_column(sign_column_), : description(description_), sign_column_name(sign_column_name_)
log(&Logger::get("CollapsingFinalBlockInputStream")),
first(true), count_positive(0), count_negative(0), count_incorrect_data(0), blocks_fetched(0), blocks_output(0)
{ {
children.insert(children.end(), inputs_.begin(), inputs_.end()); children.insert(children.end(), inputs_.begin(), inputs_.end());
} }
@ -40,7 +38,7 @@ public:
for (size_t i = 0; i < description.size(); ++i) for (size_t i = 0; i < description.size(); ++i)
res << ", " << description[i].getID(); res << ", " << description[i].getID();
res << ", sign_column, " << sign_column << ")"; res << ", sign_column, " << sign_column_name << ")";
return res.str(); return res.str();
} }
@ -55,10 +53,10 @@ private:
{ {
MergingBlock(Block block_, MergingBlock(Block block_,
size_t stream_index_, size_t stream_index_,
SortDescription desc_, const SortDescription & desc,
String sign_column_name, String sign_column_name,
BlockPlainPtrs * output_blocks) BlockPlainPtrs * output_blocks)
: block(block_), stream_index(stream_index_), desc(desc_), refcount(0), output_blocks(output_blocks) : block(block_), stream_index(stream_index_), output_blocks(output_blocks)
{ {
sort_columns.resize(desc.size()); sort_columns.resize(desc.size());
for (size_t i = 0; i < desc.size(); ++i) for (size_t i = 0; i < desc.size(); ++i)
@ -86,8 +84,6 @@ private:
/// Строки с одинаковым ключом будут упорядочены по возрастанию stream_index. /// Строки с одинаковым ключом будут упорядочены по возрастанию stream_index.
size_t stream_index; size_t stream_index;
SortDescription desc;
size_t rows; size_t rows;
/// Какие строки нужно оставить. Заполняется при слиянии потоков. /// Какие строки нужно оставить. Заполняется при слиянии потоков.
@ -98,7 +94,7 @@ private:
const ColumnInt8 * sign_column; const ColumnInt8 * sign_column;
/// Когда достигает нуля, блок можно выдавать в ответ. /// Когда достигает нуля, блок можно выдавать в ответ.
int refcount; int refcount = 0;
/// Куда положить блок, когда он готов попасть в ответ. /// Куда положить блок, когда он готов попасть в ответ.
BlockPlainPtrs * output_blocks; BlockPlainPtrs * output_blocks;
@ -181,17 +177,17 @@ private:
Cursor() {} Cursor() {}
explicit Cursor(MergingBlockPtr block_, size_t pos_ = 0) : block(block_), pos(pos_) {} explicit Cursor(MergingBlockPtr block_, size_t pos_ = 0) : block(block_), pos(pos_) {}
bool operator<(const Cursor & rhs) const bool operator< (const Cursor & rhs) const
{ {
for (size_t i = 0; i < block->sort_columns.size(); ++i) for (size_t i = 0; i < block->sort_columns.size(); ++i)
{ {
int direction = block->desc[i].direction; int res = block->sort_columns[i]->compareAt(pos, rhs.pos, *(rhs.block->sort_columns[i]), 1);
int res = direction * block->sort_columns[i]->compareAt(pos, rhs.pos, *(rhs.block->sort_columns[i]), direction);
if (res > 0) if (res > 0)
return true; return true;
if (res < 0) if (res < 0)
return false; return false;
} }
return block->stream_index > rhs.block->stream_index; return block->stream_index > rhs.block->stream_index;
} }
@ -203,7 +199,7 @@ private:
for (size_t i = 0; i < block->sort_columns.size(); ++i) for (size_t i = 0; i < block->sort_columns.size(); ++i)
{ {
int res = block->desc[i].direction * block->sort_columns[i]->compareAt(pos, rhs.pos, *(rhs.block->sort_columns[i]), 1); int res = block->sort_columns[i]->compareAt(pos, rhs.pos, *(rhs.block->sort_columns[i]), 1);
if (res != 0) if (res != 0)
return false; return false;
} }
@ -235,12 +231,12 @@ private:
typedef std::priority_queue<Cursor> Queue; typedef std::priority_queue<Cursor> Queue;
SortDescription description; const SortDescription description;
String sign_column; String sign_column_name;
Logger * log; Logger * log = &Logger::get("CollapsingFinalBlockInputStream");
bool first; bool first = true;
BlockPlainPtrs output_blocks; BlockPlainPtrs output_blocks;
@ -249,15 +245,15 @@ private:
Cursor previous; /// Текущий первичный ключ. Cursor previous; /// Текущий первичный ключ.
Cursor last_positive; /// Последняя положительная строка для текущего первичного ключа. Cursor last_positive; /// Последняя положительная строка для текущего первичного ключа.
size_t count_positive; /// Количество положительных строк для текущего первичного ключа. size_t count_positive = 0; /// Количество положительных строк для текущего первичного ключа.
size_t count_negative; /// Количество отрицательных строк для текущего первичного ключа. size_t count_negative = 0; /// Количество отрицательных строк для текущего первичного ключа.
bool last_is_positive; /// true, если последняя строка для текущего первичного ключа положительная. bool last_is_positive = false; /// true, если последняя строка для текущего первичного ключа положительная.
size_t count_incorrect_data; /// Чтобы не писать в лог слишком много сообщений об ошибке. size_t count_incorrect_data = 0; /// Чтобы не писать в лог слишком много сообщений об ошибке.
/// Посчитаем, сколько блоков получили на вход и отдали на выход. /// Посчитаем, сколько блоков получили на вход и отдали на выход.
size_t blocks_fetched; size_t blocks_fetched = 0;
size_t blocks_output; size_t blocks_output = 0;
void fetchNextBlock(size_t input_index); void fetchNextBlock(size_t input_index);
void commitCurrent(); void commitCurrent();

View File

@ -117,27 +117,19 @@ public:
/// Используется подмножество ограничений из Limits. /// Используется подмножество ограничений из Limits.
struct LocalLimits struct LocalLimits
{ {
LimitsMode mode; LimitsMode mode = LIMITS_CURRENT;
size_t max_rows_to_read; size_t max_rows_to_read = 0;
size_t max_bytes_to_read; size_t max_bytes_to_read = 0;
OverflowMode read_overflow_mode; OverflowMode read_overflow_mode = OverflowMode::THROW;
Poco::Timespan max_execution_time; Poco::Timespan max_execution_time = 0;
OverflowMode timeout_overflow_mode; OverflowMode timeout_overflow_mode = OverflowMode::THROW;
/// В строчках в секунду. /// В строчках в секунду.
size_t min_execution_speed; size_t min_execution_speed = 0;
/// Проверять, что скорость не слишком низкая, после прошествия указанного времени. /// Проверять, что скорость не слишком низкая, после прошествия указанного времени.
Poco::Timespan timeout_before_checking_execution_speed; Poco::Timespan timeout_before_checking_execution_speed = 0;
LocalLimits()
: mode(LIMITS_CURRENT),
max_rows_to_read(0), max_bytes_to_read(0), read_overflow_mode(OverflowMode::THROW),
max_execution_time(0), timeout_overflow_mode(OverflowMode::THROW),
min_execution_speed(0), timeout_before_checking_execution_speed(0)
{
}
}; };
/** Установить ограничения для проверки на каждый блок. */ /** Установить ограничения для проверки на каждый блок. */

View File

@ -16,14 +16,6 @@ using Poco::SharedPtr;
class MergingAggregatedBlockInputStream : public IProfilingBlockInputStream class MergingAggregatedBlockInputStream : public IProfilingBlockInputStream
{ {
public: public:
MergingAggregatedBlockInputStream(BlockInputStreamPtr input_, const ColumnNumbers & keys_,
const AggregateDescriptions & aggregates_, bool overflow_row_, bool final_, size_t max_threads_)
: aggregator(keys_, aggregates_, overflow_row_, 0, OverflowMode::THROW, nullptr, 0, 0),
final(final_), max_threads(max_threads_)
{
children.push_back(input_);
}
MergingAggregatedBlockInputStream(BlockInputStreamPtr input_, const Names & keys_names_, MergingAggregatedBlockInputStream(BlockInputStreamPtr input_, const Names & keys_names_,
const AggregateDescriptions & aggregates_, bool overflow_row_, bool final_, size_t max_threads_) const AggregateDescriptions & aggregates_, bool overflow_row_, bool final_, size_t max_threads_)
: aggregator(keys_names_, aggregates_, overflow_row_, 0, OverflowMode::THROW, nullptr, 0, 0), : aggregator(keys_names_, aggregates_, overflow_row_, 0, OverflowMode::THROW, nullptr, 0, 0),

View File

@ -19,19 +19,6 @@ using Poco::SharedPtr;
class ParallelAggregatingBlockInputStream : public IProfilingBlockInputStream class ParallelAggregatingBlockInputStream : public IProfilingBlockInputStream
{ {
public: public:
ParallelAggregatingBlockInputStream(BlockInputStreams inputs, const ColumnNumbers & keys_,
AggregateDescriptions & aggregates_, bool overflow_row_, bool final_, size_t max_threads_,
size_t max_rows_to_group_by_, OverflowMode group_by_overflow_mode_,
Compiler * compiler_, UInt32 min_count_to_compile_, size_t group_by_two_level_threshold_)
: aggregator(keys_, aggregates_, overflow_row_, max_rows_to_group_by_, group_by_overflow_mode_,
compiler_, min_count_to_compile_, group_by_two_level_threshold_),
final(final_), max_threads(std::min(inputs.size(), max_threads_)),
keys_size(keys_.size()), aggregates_size(aggregates_.size()),
handler(*this), processor(inputs, max_threads, handler)
{
children.insert(children.end(), inputs.begin(), inputs.end());
}
/** Столбцы из key_names и аргументы агрегатных функций, уже должны быть вычислены. /** Столбцы из key_names и аргументы агрегатных функций, уже должны быть вычислены.
*/ */
ParallelAggregatingBlockInputStream(BlockInputStreams inputs, const Names & key_names, ParallelAggregatingBlockInputStream(BlockInputStreams inputs, const Names & key_names,
@ -41,7 +28,7 @@ public:
: aggregator(key_names, aggregates, overflow_row_, max_rows_to_group_by_, group_by_overflow_mode_, : aggregator(key_names, aggregates, overflow_row_, max_rows_to_group_by_, group_by_overflow_mode_,
compiler_, min_count_to_compile_, group_by_two_level_threshold_), compiler_, min_count_to_compile_, group_by_two_level_threshold_),
final(final_), max_threads(std::min(inputs.size(), max_threads_)), final(final_), max_threads(std::min(inputs.size(), max_threads_)),
keys_size(key_names.size()), aggregates_size(aggregates.size()), keys_size(aggregator.getNumberOfKeys()), aggregates_size(aggregator.getNumberOfAggregates()),
handler(*this), processor(inputs, max_threads, handler) handler(*this), processor(inputs, max_threads, handler)
{ {
children.insert(children.end(), inputs.begin(), inputs.end()); children.insert(children.end(), inputs.begin(), inputs.end());

View File

@ -134,6 +134,65 @@ struct ConvertImpl<DataTypeDateTime, DataTypeDate, Name>
}; };
/** Отдельный случай для преобразования UInt32 или UInt64 в Date.
* Если число меньше 65536, то оно понимается, как DayNum, а если больше - как unix timestamp.
* Немного нелогично, что мы, по сути, помещаем две разные функции в одну.
* Но зато это позволяет поддержать распространённый случай,
* когда пользователь пишет toDate(UInt32), ожидая, что это - перевод unix timestamp в дату
* (иначе такое использование было бы распространённой ошибкой).
*/
template <typename FromDataType, typename Name>
struct ConvertImplUInt32Or64ToDate
{
typedef typename FromDataType::FieldType FromFieldType;
typedef DataTypeDate::FieldType ToFieldType;
template <typename To, typename From>
static To convert(const From & from, const DateLUT & date_lut)
{
return from < 0xFFFF
? from
: date_lut.toDayNum(from);
}
static void execute(Block & block, const ColumnNumbers & arguments, size_t result)
{
DateLUT & date_lut = DateLUT::instance();
if (const ColumnVector<FromFieldType> * col_from
= typeid_cast<const ColumnVector<FromFieldType> *>(&*block.getByPosition(arguments[0]).column))
{
ColumnVector<ToFieldType> * col_to = new ColumnVector<ToFieldType>;
block.getByPosition(result).column = col_to;
const typename ColumnVector<FromFieldType>::Container_t & vec_from = col_from->getData();
typename ColumnVector<ToFieldType>::Container_t & vec_to = col_to->getData();
size_t size = vec_from.size();
vec_to.resize(size);
for (size_t i = 0; i < size; ++i)
vec_to[i] = convert<ToFieldType>(vec_from[i], date_lut);
}
else if (const ColumnConst<FromFieldType> * col_from
= typeid_cast<const ColumnConst<FromFieldType> *>(&*block.getByPosition(arguments[0]).column))
{
block.getByPosition(result).column = new ColumnConst<ToFieldType>(col_from->size(),
convert<ToFieldType>(col_from->getData(), date_lut));
}
else
throw Exception("Illegal column " + block.getByPosition(arguments[0]).column->getName()
+ " of first argument of function " + Name::name,
ErrorCodes::ILLEGAL_COLUMN);
}
};
template <typename Name>
struct ConvertImpl<DataTypeUInt32, DataTypeDate, Name> : ConvertImplUInt32Or64ToDate<DataTypeUInt32, Name> {};
template <typename Name>
struct ConvertImpl<DataTypeUInt64, DataTypeDate, Name> : ConvertImplUInt32Or64ToDate<DataTypeUInt64, Name> {};
/** Преобразование чисел, дат, дат-с-временем в строки: через форматирование. /** Преобразование чисел, дат, дат-с-временем в строки: через форматирование.
*/ */
template <typename DataType> void formatImpl(typename DataType::FieldType x, WriteBuffer & wb) { writeText(x, wb); } template <typename DataType> void formatImpl(typename DataType::FieldType x, WriteBuffer & wb) { writeText(x, wb); }
@ -392,9 +451,9 @@ public:
IDataType * from_type = &*block.getByPosition(arguments[0]).type; IDataType * from_type = &*block.getByPosition(arguments[0]).type;
if (typeid_cast<const DataTypeUInt8 * >(from_type)) ConvertImpl<DataTypeUInt8, ToDataType, Name>::execute(block, arguments, result); if (typeid_cast<const DataTypeUInt8 * >(from_type)) ConvertImpl<DataTypeUInt8, ToDataType, Name>::execute(block, arguments, result);
else if (typeid_cast<const DataTypeUInt16 * >(from_type)) ConvertImpl<DataTypeUInt16, ToDataType, Name>::execute(block, arguments, result); else if (typeid_cast<const DataTypeUInt16 * >(from_type)) ConvertImpl<DataTypeUInt16, ToDataType, Name>::execute(block, arguments, result);
else if (typeid_cast<const DataTypeUInt32 * >(from_type)) ConvertImpl<DataTypeUInt32, ToDataType, Name>::execute(block, arguments, result); else if (typeid_cast<const DataTypeUInt32 * >(from_type)) ConvertImpl<DataTypeUInt32, ToDataType, Name>::execute(block, arguments, result);
else if (typeid_cast<const DataTypeUInt64 * >(from_type)) ConvertImpl<DataTypeUInt64, ToDataType, Name>::execute(block, arguments, result); else if (typeid_cast<const DataTypeUInt64 * >(from_type)) ConvertImpl<DataTypeUInt64, ToDataType, Name>::execute(block, arguments, result);
else if (typeid_cast<const DataTypeInt8 * >(from_type)) ConvertImpl<DataTypeInt8, ToDataType, Name>::execute(block, arguments, result); else if (typeid_cast<const DataTypeInt8 * >(from_type)) ConvertImpl<DataTypeInt8, ToDataType, Name>::execute(block, arguments, result);
else if (typeid_cast<const DataTypeInt16 * >(from_type)) ConvertImpl<DataTypeInt16, ToDataType, Name>::execute(block, arguments, result); else if (typeid_cast<const DataTypeInt16 * >(from_type)) ConvertImpl<DataTypeInt16, ToDataType, Name>::execute(block, arguments, result);
else if (typeid_cast<const DataTypeInt32 * >(from_type)) ConvertImpl<DataTypeInt32, ToDataType, Name>::execute(block, arguments, result); else if (typeid_cast<const DataTypeInt32 * >(from_type)) ConvertImpl<DataTypeInt32, ToDataType, Name>::execute(block, arguments, result);
@ -403,7 +462,7 @@ public:
else if (typeid_cast<const DataTypeFloat64 * >(from_type)) ConvertImpl<DataTypeFloat64, ToDataType, Name>::execute(block, arguments, result); else if (typeid_cast<const DataTypeFloat64 * >(from_type)) ConvertImpl<DataTypeFloat64, ToDataType, Name>::execute(block, arguments, result);
else if (typeid_cast<const DataTypeDate * >(from_type)) ConvertImpl<DataTypeDate, ToDataType, Name>::execute(block, arguments, result); else if (typeid_cast<const DataTypeDate * >(from_type)) ConvertImpl<DataTypeDate, ToDataType, Name>::execute(block, arguments, result);
else if (typeid_cast<const DataTypeDateTime * >(from_type)) ConvertImpl<DataTypeDateTime, ToDataType, Name>::execute(block, arguments, result); else if (typeid_cast<const DataTypeDateTime * >(from_type)) ConvertImpl<DataTypeDateTime, ToDataType, Name>::execute(block, arguments, result);
else if (typeid_cast<const DataTypeString * >(from_type)) ConvertImpl<DataTypeString, ToDataType, Name>::execute(block, arguments, result); else if (typeid_cast<const DataTypeString * >(from_type)) ConvertImpl<DataTypeString, ToDataType, Name>::execute(block, arguments, result);
else if (typeid_cast<const DataTypeFixedString *>(from_type)) ConvertImpl<DataTypeFixedString, ToDataType, Name>::execute(block, arguments, result); else if (typeid_cast<const DataTypeFixedString *>(from_type)) ConvertImpl<DataTypeFixedString, ToDataType, Name>::execute(block, arguments, result);
else else
throw Exception("Illegal type " + block.getByPosition(arguments[0]).type->getName() + " of argument of function " + getName(), throw Exception("Illegal type " + block.getByPosition(arguments[0]).type->getName() + " of argument of function " + getName(),

View File

@ -64,6 +64,9 @@ public:
pos = ptr + offset; pos = ptr + offset;
} }
/// получить буфер
inline Buffer & internalBuffer() { return internal_buffer; }
/// получить часть буфера, из которого можно читать / в который можно писать данные /// получить часть буфера, из которого можно читать / в который можно писать данные
inline Buffer & buffer() { return working_buffer; } inline Buffer & buffer() { return working_buffer; }

View File

@ -0,0 +1,69 @@
#pragma once
#include <DB/IO/ReadBuffer.h>
#include <DB/IO/BufferWithOwnMemory.h>
#include <statdaemons/AIO.h>
#include <string>
#include <limits>
#include <unistd.h>
#include <fcntl.h>
namespace DB
{
/** Класс для асинхронной чтения данных.
* Все размеры и смещения должны быть кратны DEFAULT_AIO_FILE_BLOCK_SIZE байтам.
*/
class ReadBufferAIO : public BufferWithOwnMemory<ReadBuffer>
{
public:
ReadBufferAIO(const std::string & filename_, size_t buffer_size_ = DBMS_DEFAULT_BUFFER_SIZE, int flags_ = -1, mode_t mode_ = 0666,
char * existing_memory_ = nullptr);
~ReadBufferAIO() override;
ReadBufferAIO(const ReadBufferAIO &) = delete;
ReadBufferAIO & operator=(const ReadBufferAIO &) = delete;
void setMaxBytes(size_t max_bytes_read_);
off_t seek(off_t off, int whence = SEEK_SET);
off_t getPositionInFile();
std::string getFileName() const noexcept { return filename; }
int getFD() const noexcept { return fd; }
private:
off_t getPositionInFileRelaxed() const noexcept;
bool nextImpl();
/// Ждать окончания текущей асинхронной задачи.
void waitForAIOCompletion();
/// Менять местами основной и дублирующий буферы.
void swapBuffers() noexcept;
private:
/// Буфер для асинхронных операций чтения данных.
BufferWithOwnMemory<ReadBuffer> fill_buffer;
iocb request;
std::vector<iocb *> request_ptrs;
std::vector<io_event> events;
AIOContext aio_context;
const std::string filename;
size_t max_bytes_read = std::numeric_limits<size_t>::max();
size_t total_bytes_read = 0;
off_t pos_in_file = 0;
int fd = -1;
/// Асинхронная операция чтения ещё не завершилась.
bool is_pending_read = false;
/// Было получено исключение.
bool got_exception = false;
/// Конец файла достигнут.
bool is_eof = false;
/// Был отправлен хоть один запрос на асинхронную операцию чтения.
bool is_started = false;
};
}

View File

@ -99,7 +99,7 @@ public:
} }
} }
size_t getPositionInFile() off_t getPositionInFile()
{ {
return pos_in_file - (working_buffer.end() - pos); return pos_in_file - (working_buffer.end() - pos);
} }

View File

@ -0,0 +1,65 @@
#pragma once
#include <DB/IO/WriteBuffer.h>
#include <DB/IO/BufferWithOwnMemory.h>
#include <statdaemons/AIO.h>
#include <string>
#include <unistd.h>
#include <fcntl.h>
namespace DB
{
/** Класс для асинхронной записи данных.
* Все размеры и смещения должны быть кратны DEFAULT_AIO_FILE_BLOCK_SIZE байтам.
*/
class WriteBufferAIO : public BufferWithOwnMemory<WriteBuffer>
{
public:
WriteBufferAIO(const std::string & filename_, size_t buffer_size_ = DBMS_DEFAULT_BUFFER_SIZE, int flags_ = -1, mode_t mode_ = 0666,
char * existing_memory_ = nullptr);
~WriteBufferAIO() override;
WriteBufferAIO(const WriteBufferAIO &) = delete;
WriteBufferAIO & operator=(const WriteBufferAIO &) = delete;
off_t seek(off_t off, int whence = SEEK_SET);
off_t getPositionInFile();
void truncate(off_t length = 0);
void sync();
std::string getFileName() const noexcept { return filename; }
int getFD() const noexcept { return fd; }
private:
/// Если в буфере ещё остались данные - запишем их.
void flush();
///
void nextImpl();
/// Ждать окончания текущей асинхронной задачи.
void waitForAIOCompletion();
/// Менять местами основной и дублирующий буферы.
void swapBuffers() noexcept;
private:
/// Буфер для асинхронных операций записи данных.
BufferWithOwnMemory<WriteBuffer> flush_buffer;
iocb request;
std::vector<iocb *> request_ptrs;
std::vector<io_event> events;
AIOContext aio_context;
const std::string filename;
off_t pos_in_file = 0;
int fd = -1;
/// Асинхронная операция записи ещё не завершилась.
bool is_pending_write = false;
/// Было получено исключение.
bool got_exception = false;
};
}

View File

@ -528,9 +528,7 @@ struct AggregatedDataVariants : private boost::noncopyable
void init(Type type_) void init(Type type_)
{ {
type = type_; switch (type_)
switch (type)
{ {
case Type::EMPTY: break; case Type::EMPTY: break;
case Type::without_key: break; case Type::without_key: break;
@ -543,6 +541,8 @@ struct AggregatedDataVariants : private boost::noncopyable
default: default:
throw Exception("Unknown aggregated data variant.", ErrorCodes::UNKNOWN_AGGREGATED_DATA_VARIANT); throw Exception("Unknown aggregated data variant.", ErrorCodes::UNKNOWN_AGGREGATED_DATA_VARIANT);
} }
type = type_;
} }
size_t size() const size_t size() const
@ -676,19 +676,6 @@ APPLY_FOR_AGGREGATED_VARIANTS(M)
class Aggregator class Aggregator
{ {
public: public:
Aggregator(const ColumnNumbers & keys_, const AggregateDescriptions & aggregates_, bool overflow_row_,
size_t max_rows_to_group_by_, OverflowMode group_by_overflow_mode_, Compiler * compiler_, UInt32 min_count_to_compile_,
size_t group_by_two_level_threshold_)
: keys(keys_), aggregates(aggregates_), aggregates_size(aggregates.size()),
overflow_row(overflow_row_),
max_rows_to_group_by(max_rows_to_group_by_), group_by_overflow_mode(group_by_overflow_mode_),
compiler(compiler_), min_count_to_compile(min_count_to_compile_), group_by_two_level_threshold(group_by_two_level_threshold_)
{
std::sort(keys.begin(), keys.end());
keys.erase(std::unique(keys.begin(), keys.end()), keys.end());
keys_size = keys.size();
}
Aggregator(const Names & key_names_, const AggregateDescriptions & aggregates_, bool overflow_row_, Aggregator(const Names & key_names_, const AggregateDescriptions & aggregates_, bool overflow_row_,
size_t max_rows_to_group_by_, OverflowMode group_by_overflow_mode_, Compiler * compiler_, UInt32 min_count_to_compile_, size_t max_rows_to_group_by_, OverflowMode group_by_overflow_mode_, Compiler * compiler_, UInt32 min_count_to_compile_,
size_t group_by_two_level_threshold_) size_t group_by_two_level_threshold_)
@ -739,6 +726,9 @@ public:
/// Для IBlockInputStream. /// Для IBlockInputStream.
String getID() const; String getID() const;
size_t getNumberOfKeys() const { return keys_size; }
size_t getNumberOfAggregates() const { return aggregates_size; }
protected: protected:
friend struct AggregatedDataVariants; friend struct AggregatedDataVariants;

View File

@ -218,6 +218,9 @@ private:
/// Eliminates injective function calls and constant expressions from group by statement /// Eliminates injective function calls and constant expressions from group by statement
void optimizeGroupBy(); void optimizeGroupBy();
/// Удалить из ORDER BY повторяющиеся элементы.
void optimizeOrderBy();
/// Превратить перечисление значений или подзапрос в ASTSet. node - функция in или notIn. /// Превратить перечисление значений или подзапрос в ASTSet. node - функция in или notIn.
void makeSet(ASTFunction * node, const Block & sample_block); void makeSet(ASTFunction * node, const Block & sample_block);

View File

@ -109,6 +109,12 @@ struct Settings
\ \
/** Минимальная длина выражения expr = x1 OR ... expr = xN для оптимизации */ \ /** Минимальная длина выражения expr = x1 OR ... expr = xN для оптимизации */ \
M(SettingUInt64, optimize_min_equality_disjunction_chain_length, 3) \ M(SettingUInt64, optimize_min_equality_disjunction_chain_length, 3) \
\
/** Минимальное количество байтов для операций ввода/ввывода минуя кэш страниц */ \
M(SettingUInt64, min_bytes_to_use_direct_io, (20U * 1024U * 1024U * 1024U)) \
\
/** Кидать исключение, если есть индекс по дате, и он не используется. */ \
M(SettingBool, force_index_by_date, 0) \
/// Всевозможные ограничения на выполнение запроса. /// Всевозможные ограничения на выполнение запроса.
Limits limits; Limits limits;

View File

@ -47,29 +47,19 @@ public:
/// Основное имя типа таблицы (например, StorageMergeTree). /// Основное имя типа таблицы (например, StorageMergeTree).
virtual std::string getName() const = 0; virtual std::string getName() const = 0;
/** Возвращает true, если хранилище получает данные с удалённого сервера или серверов. /** Возвращает true, если хранилище получает данные с удалённого сервера или серверов. */
*/
virtual bool isRemote() const { return false; } virtual bool isRemote() const { return false; }
virtual void storeExternalTables(const std::map<String, StoragePtr> & tables_) /** Возвращает true, если хранилище поддерживает запросы с секцией SAMPLE. */
{
throw Exception("Method storeExternalTables is not supported by storage " + getName(), ErrorCodes::NOT_IMPLEMENTED);
}
/** Возвращает true, если хранилище поддерживает запросы с секцией SAMPLE.
*/
virtual bool supportsSampling() const { return false; } virtual bool supportsSampling() const { return false; }
/** Возвращает true, если хранилище поддерживает запросы с секцией FINAL. /** Возвращает true, если хранилище поддерживает запросы с секцией FINAL. */
*/
virtual bool supportsFinal() const { return false; } virtual bool supportsFinal() const { return false; }
/** Возвращает true, если хранилище поддерживает запросы с секцией PREWHERE. /** Возвращает true, если хранилище поддерживает запросы с секцией PREWHERE. */
*/
virtual bool supportsPrewhere() const { return false; } virtual bool supportsPrewhere() const { return false; }
/** Возвращает true, если хранилище поддерживает несколько реплик. /** Возвращает true, если хранилище поддерживает несколько реплик. */
*/
virtual bool supportsParallelReplicas() const { return false; } virtual bool supportsParallelReplicas() const { return false; }
/** Не дает изменять описание таблицы (в том числе переименовывать и удалять таблицу). /** Не дает изменять описание таблицы (в том числе переименовывать и удалять таблицу).

View File

@ -55,6 +55,8 @@ namespace DB
* (см. CollapsingSortedBlockInputStream.h) * (см. CollapsingSortedBlockInputStream.h)
* - Summing - при склейке кусков, при совпадении PK суммировать все числовые столбцы, не входящие в PK. * - Summing - при склейке кусков, при совпадении PK суммировать все числовые столбцы, не входящие в PK.
* - Aggregating - при склейке кусков, при совпадении PK, делается слияние состояний столбцов-агрегатных функций. * - Aggregating - при склейке кусков, при совпадении PK, делается слияние состояний столбцов-агрегатных функций.
* - Unsorted - при склейке кусков, данные не упорядочиваются, а всего лишь конкатенируются;
* - это позволяет читать данные ровно такими пачками, какими они были записаны.
*/ */
/** Этот класс хранит список кусков и параметры структуры данных. /** Этот класс хранит список кусков и параметры структуры данных.
@ -399,18 +401,22 @@ public:
} }
size_t key_size = storage.sort_descr.size(); size_t key_size = storage.sort_descr.size();
index.resize(key_size * size);
String index_path = storage.full_path + name + "/primary.idx"; if (key_size)
ReadBufferFromFile index_file(index_path, {
std::min(static_cast<size_t>(DBMS_DEFAULT_BUFFER_SIZE), Poco::File(index_path).getSize())); index.resize(key_size * size);
for (size_t i = 0; i < size; ++i) String index_path = storage.full_path + name + "/primary.idx";
for (size_t j = 0; j < key_size; ++j) ReadBufferFromFile index_file(index_path,
storage.primary_key_sample.getByPosition(j).type->deserializeBinary(index[i * key_size + j], index_file); std::min(static_cast<size_t>(DBMS_DEFAULT_BUFFER_SIZE), Poco::File(index_path).getSize()));
if (!index_file.eof()) for (size_t i = 0; i < size; ++i)
throw Exception("index file " + index_path + " is unexpectedly long", ErrorCodes::EXPECTED_END_OF_FILE); for (size_t j = 0; j < key_size; ++j)
storage.primary_key_sample.getByPosition(j).type->deserializeBinary(index[i * key_size + j], index_file);
if (!index_file.eof())
throw Exception("index file " + index_path + " is unexpectedly long", ErrorCodes::EXPECTED_END_OF_FILE);
}
size_in_bytes = calcTotalSize(storage.full_path + name + "/"); size_in_bytes = calcTotalSize(storage.full_path + name + "/");
} }
@ -468,7 +474,7 @@ public:
if (!checksums.empty()) if (!checksums.empty())
{ {
if (!checksums.files.count("primary.idx")) if (!storage.sort_descr.empty() && !checksums.files.count("primary.idx"))
throw Exception("No checksum for primary.idx", ErrorCodes::NO_FILE_IN_DATA_PART); throw Exception("No checksum for primary.idx", ErrorCodes::NO_FILE_IN_DATA_PART);
if (require_part_metadata) if (require_part_metadata)
@ -486,12 +492,14 @@ public:
} }
else else
{ {
/// Проверяем, что первичный ключ непуст. if (!storage.sort_descr.empty())
{
/// Проверяем, что первичный ключ непуст.
Poco::File index_file(path + "/primary.idx");
Poco::File index_file(path + "/primary.idx"); if (!index_file.exists() || index_file.getSize() == 0)
throw Exception("Part " + path + " is broken: primary key is empty.", ErrorCodes::BAD_SIZE_OF_FILE_IN_DATA_PART);
if (!index_file.exists() || index_file.getSize() == 0) }
throw Exception("Part " + path + " is broken: primary key is empty.", ErrorCodes::BAD_SIZE_OF_FILE_IN_DATA_PART);
/// Проверяем, что все засечки непусты и имеют одинаковый размер. /// Проверяем, что все засечки непусты и имеют одинаковый размер.
@ -620,6 +628,7 @@ public:
Collapsing, Collapsing,
Summing, Summing,
Aggregating, Aggregating,
Unsorted,
}; };
static void doNothing(const String & name) {} static void doNothing(const String & name) {}
@ -628,7 +637,7 @@ public:
* (корректность имён и путей не проверяется) * (корректность имён и путей не проверяется)
* состоящую из указанных столбцов. * состоящую из указанных столбцов.
* *
* primary_expr_ast - выражение для сортировки; * primary_expr_ast - выражение для сортировки; Пустое для UnsortedMergeTree.
* date_column_name - имя столбца с датой; * date_column_name - имя столбца с датой;
* index_granularity - на сколько строчек пишется одно значение индекса. * index_granularity - на сколько строчек пишется одно значение индекса.
* require_part_metadata - обязательно ли в директории с куском должны быть checksums.txt и columns.txt * require_part_metadata - обязательно ли в директории с куском должны быть checksums.txt и columns.txt

View File

@ -15,18 +15,32 @@ namespace DB
class IMergedBlockOutputStream : public IBlockOutputStream class IMergedBlockOutputStream : public IBlockOutputStream
{ {
public: public:
IMergedBlockOutputStream(MergeTreeData & storage_, size_t min_compress_block_size_, size_t max_compress_block_size_) : storage(storage_), index_offset(0), min_compress_block_size(min_compress_block_size_), max_compress_block_size(max_compress_block_size_) IMergedBlockOutputStream(
MergeTreeData & storage_,
size_t min_compress_block_size_,
size_t max_compress_block_size_,
CompressionMethod compression_method_)
: storage(storage_),
min_compress_block_size(min_compress_block_size_),
max_compress_block_size(max_compress_block_size_),
compression_method(compression_method_)
{ {
} }
protected: protected:
typedef std::set<std::string> OffsetColumns; using OffsetColumns = std::set<std::string>;
struct ColumnStream struct ColumnStream
{ {
ColumnStream(const String & escaped_column_name_, const String & data_path, const std::string & marks_path, size_t max_compress_block_size = DEFAULT_MAX_COMPRESS_BLOCK_SIZE) : ColumnStream(
const String & escaped_column_name_,
const String & data_path,
const std::string & marks_path,
size_t max_compress_block_size,
CompressionMethod compression_method) :
escaped_column_name(escaped_column_name_), escaped_column_name(escaped_column_name_),
plain_file(data_path, max_compress_block_size, O_TRUNC | O_CREAT | O_WRONLY), plain_file(data_path, max_compress_block_size, O_TRUNC | O_CREAT | O_WRONLY),
plain_hashing(plain_file), compressed_buf(plain_hashing), compressed(compressed_buf), plain_hashing(plain_file), compressed_buf(plain_hashing, compression_method), compressed(compressed_buf),
marks_file(marks_path, 4096, O_TRUNC | O_CREAT | O_WRONLY), marks(marks_file) {} marks_file(marks_path, 4096, O_TRUNC | O_CREAT | O_WRONLY), marks(marks_file) {}
String escaped_column_name; String escaped_column_name;
@ -70,7 +84,7 @@ protected:
} }
}; };
typedef std::map<String, std::unique_ptr<ColumnStream> > ColumnStreams; using ColumnStreams = std::map<String, std::unique_ptr<ColumnStream>>;
void addStream(const String & path, const String & name, const IDataType & type, size_t level = 0, String filename = "") void addStream(const String & path, const String & name, const IDataType & type, size_t level = 0, String filename = "")
{ {
@ -92,7 +106,8 @@ protected:
escaped_size_name, escaped_size_name,
path + escaped_size_name + ".bin", path + escaped_size_name + ".bin",
path + escaped_size_name + ".mrk", path + escaped_size_name + ".mrk",
max_compress_block_size)); max_compress_block_size,
compression_method));
addStream(path, name, *type_arr->getNestedType(), level + 1); addStream(path, name, *type_arr->getNestedType(), level + 1);
} }
@ -101,7 +116,8 @@ protected:
escaped_column_name, escaped_column_name,
path + escaped_column_name + ".bin", path + escaped_column_name + ".bin",
path + escaped_column_name + ".mrk", path + escaped_column_name + ".mrk",
max_compress_block_size)); max_compress_block_size,
compression_method));
} }
@ -146,7 +162,8 @@ protected:
type_arr->serializeOffsets(column, stream.compressed, prev_mark, limit); type_arr->serializeOffsets(column, stream.compressed, prev_mark, limit);
stream.compressed.nextIfAtEnd(); /// Чтобы вместо засечек, указывающих на конец сжатого блока, были засечки, указывающие на начало следующего. /// Чтобы вместо засечек, указывающих на конец сжатого блока, были засечки, указывающие на начало следующего.
stream.compressed.nextIfAtEnd();
prev_mark += limit; prev_mark += limit;
} }
@ -180,7 +197,8 @@ protected:
type.serializeBinary(column, stream.compressed, prev_mark, limit); type.serializeBinary(column, stream.compressed, prev_mark, limit);
stream.compressed.nextIfAtEnd(); /// Чтобы вместо засечек, указывающих на конец сжатого блока, были засечки, указывающие на начало следующего. /// Чтобы вместо засечек, указывающих на конец сжатого блока, были засечки, указывающие на начало следующего.
stream.compressed.nextIfAtEnd();
prev_mark += limit; prev_mark += limit;
} }
@ -192,10 +210,12 @@ protected:
ColumnStreams column_streams; ColumnStreams column_streams;
/// Смещение до первой строчки блока, для которой надо записать индекс. /// Смещение до первой строчки блока, для которой надо записать индекс.
size_t index_offset; size_t index_offset = 0;
size_t min_compress_block_size; size_t min_compress_block_size;
size_t max_compress_block_size; size_t max_compress_block_size;
CompressionMethod compression_method;
}; };
/** Для записи одного куска. Данные уже отсортированы, относятся к одному месяцу, и пишутся в один кускок. /** Для записи одного куска. Данные уже отсортированы, относятся к одному месяцу, и пишутся в один кускок.
@ -203,13 +223,23 @@ protected:
class MergedBlockOutputStream : public IMergedBlockOutputStream class MergedBlockOutputStream : public IMergedBlockOutputStream
{ {
public: public:
MergedBlockOutputStream(MergeTreeData & storage_, String part_path_, const NamesAndTypesList & columns_list_) MergedBlockOutputStream(
: IMergedBlockOutputStream(storage_, storage_.context.getSettings().min_compress_block_size, storage_.context.getSettings().max_compress_block_size), columns_list(columns_list_), part_path(part_path_), marks_count(0) MergeTreeData & storage_,
String part_path_,
const NamesAndTypesList & columns_list_,
CompressionMethod compression_method)
: IMergedBlockOutputStream(
storage_, storage_.context.getSettings().min_compress_block_size,
storage_.context.getSettings().max_compress_block_size, compression_method),
columns_list(columns_list_), part_path(part_path_)
{ {
Poco::File(part_path).createDirectories(); Poco::File(part_path).createDirectories();
index_file_stream = new WriteBufferFromFile(part_path + "primary.idx", DBMS_DEFAULT_BUFFER_SIZE, O_TRUNC | O_CREAT | O_WRONLY); if (storage.mode != MergeTreeData::Unsorted)
index_stream = new HashingWriteBuffer(*index_file_stream); {
index_file_stream = new WriteBufferFromFile(part_path + "primary.idx", DBMS_DEFAULT_BUFFER_SIZE, O_TRUNC | O_CREAT | O_WRONLY);
index_stream = new HashingWriteBuffer(*index_file_stream);
}
for (const auto & it : columns_list) for (const auto & it : columns_list)
addStream(part_path, it.name, *it.type); addStream(part_path, it.name, *it.type);
@ -233,7 +263,9 @@ public:
{ {
for (PrimaryColumns::const_iterator it = primary_columns.begin(); it != primary_columns.end(); ++it) for (PrimaryColumns::const_iterator it = primary_columns.begin(); it != primary_columns.end(); ++it)
{ {
index_vec.push_back((*(*it)->column)[i]); if (storage.mode != MergeTreeData::Unsorted)
index_vec.push_back((*(*it)->column)[i]);
(*it)->type->serializeBinary(index_vec.back(), *index_stream); (*it)->type->serializeBinary(index_vec.back(), *index_stream);
} }
@ -264,9 +296,13 @@ public:
/// Заканчиваем запись и достаем чексуммы. /// Заканчиваем запись и достаем чексуммы.
MergeTreeData::DataPart::Checksums checksums; MergeTreeData::DataPart::Checksums checksums;
index_stream->next(); if (storage.mode != MergeTreeData::Unsorted)
checksums.files["primary.idx"].file_size = index_stream->count(); {
checksums.files["primary.idx"].file_hash = index_stream->getHash(); index_stream->next();
checksums.files["primary.idx"].file_size = index_stream->count();
checksums.files["primary.idx"].file_hash = index_stream->getHash();
index_stream = nullptr;
}
for (ColumnStreams::iterator it = column_streams.begin(); it != column_streams.end(); ++it) for (ColumnStreams::iterator it = column_streams.begin(); it != column_streams.end(); ++it)
{ {
@ -274,7 +310,6 @@ public:
it->second->addToChecksums(checksums); it->second->addToChecksums(checksums);
} }
index_stream = nullptr;
column_streams.clear(); column_streams.clear();
if (marks_count == 0) if (marks_count == 0)
@ -315,7 +350,7 @@ private:
NamesAndTypesList columns_list; NamesAndTypesList columns_list;
String part_path; String part_path;
size_t marks_count; size_t marks_count = 0;
SharedPtr<WriteBufferFromFile> index_file_stream; SharedPtr<WriteBufferFromFile> index_file_stream;
SharedPtr<HashingWriteBuffer> index_stream; SharedPtr<HashingWriteBuffer> index_stream;
@ -328,8 +363,11 @@ typedef Poco::SharedPtr<MergedBlockOutputStream> MergedBlockOutputStreamPtr;
class MergedColumnOnlyOutputStream : public IMergedBlockOutputStream class MergedColumnOnlyOutputStream : public IMergedBlockOutputStream
{ {
public: public:
MergedColumnOnlyOutputStream(MergeTreeData & storage_, String part_path_, bool sync_ = false) : MergedColumnOnlyOutputStream(MergeTreeData & storage_, String part_path_, bool sync_, CompressionMethod compression_method)
IMergedBlockOutputStream(storage_, storage_.context.getSettings().min_compress_block_size, storage_.context.getSettings().max_compress_block_size), part_path(part_path_), initialized(false), sync(sync_) : IMergedBlockOutputStream(
storage_, storage_.context.getSettings().min_compress_block_size,
storage_.context.getSettings().max_compress_block_size, compression_method),
part_path(part_path_), sync(sync_)
{ {
} }
@ -386,8 +424,7 @@ public:
private: private:
String part_path; String part_path;
bool initialized; bool initialized = false;
bool sync; bool sync;
}; };

View File

@ -55,8 +55,6 @@ public:
bool hasColumn(const String & column_name) const override; bool hasColumn(const String & column_name) const override;
bool isRemote() const override { return true; } bool isRemote() const override { return true; }
/// Сохранить временные таблицы, чтобы при следующем вызове метода read переслать их на удаленные серверы.
void storeExternalTables(const Tables & tables_) override { external_tables = tables_; }
BlockInputStreams read( BlockInputStreams read(
const Names & column_names, const Names & column_names,
@ -121,10 +119,6 @@ private:
Context & context; Context & context;
/// Временные таблицы, которые необходимо отправить на сервер. Переменная очищается после каждого вызова метода read
/// Для подготовки к отправке нужно использовтаь метод storeExternalTables
Tables external_tables;
/// Используется только, если таблица должна владеть объектом Cluster, которым больше никто не владеет - для реализации TableFunctionRemote. /// Используется только, если таблица должна владеть объектом Cluster, которым больше никто не владеет - для реализации TableFunctionRemote.
SharedPtr<Cluster> owned_cluster; SharedPtr<Cluster> owned_cluster;

View File

@ -13,7 +13,7 @@
#include <DB/DataTypes/DataTypesNumberFixed.h> #include <DB/DataTypes/DataTypesNumberFixed.h>
#include <zkutil/ZooKeeper.h> #include <zkutil/ZooKeeper.h>
#include <zkutil/LeaderElection.h> #include <zkutil/LeaderElection.h>
#include <statdaemons/threadpool.hpp>
namespace DB namespace DB
{ {

View File

@ -56,9 +56,9 @@ public:
Benchmark(unsigned concurrency_, double delay_, Benchmark(unsigned concurrency_, double delay_,
const String & host_, UInt16 port_, const String & default_database_, const String & host_, UInt16 port_, const String & default_database_,
const String & user_, const String & password_, const Settings & settings_) const String & user_, const String & password_, const Settings & settings_)
: concurrency(concurrency_), delay(delay_), queue(concurrency), pool(concurrency), : concurrency(concurrency_), delay(delay_), queue(concurrency),
connections(concurrency, host_, port_, default_database_, user_, password_, data_type_factory), connections(concurrency, host_, port_, default_database_, user_, password_, data_type_factory),
settings(settings_) settings(settings_), pool(concurrency)
{ {
std::cerr << std::fixed << std::setprecision(3); std::cerr << std::fixed << std::setprecision(3);
@ -78,8 +78,6 @@ private:
typedef ConcurrentBoundedQueue<Query> Queue; typedef ConcurrentBoundedQueue<Query> Queue;
Queue queue; Queue queue;
boost::threadpool::pool pool;
DataTypeFactory data_type_factory; DataTypeFactory data_type_factory;
ConnectionPool connections; ConnectionPool connections;
Settings settings; Settings settings;
@ -123,6 +121,8 @@ private:
Poco::FastMutex mutex; Poco::FastMutex mutex;
boost::threadpool::pool pool;
void readQueries() void readQueries()
{ {

View File

@ -162,11 +162,37 @@ void Connection::forceConnected()
} }
} }
struct PingTimeoutSetter
{
PingTimeoutSetter(Poco::Net::StreamSocket & socket_, const Poco::Timespan & ping_timeout_)
: socket(socket_), ping_timeout(ping_timeout_)
{
old_send_timeout = socket.getSendTimeout();
old_receive_timeout = socket.getReceiveTimeout();
if (old_send_timeout > ping_timeout)
socket.setSendTimeout(ping_timeout);
if (old_receive_timeout > ping_timeout)
socket.setReceiveTimeout(ping_timeout);
}
~PingTimeoutSetter()
{
socket.setSendTimeout(old_send_timeout);
socket.setReceiveTimeout(old_receive_timeout);
}
Poco::Net::StreamSocket & socket;
Poco::Timespan ping_timeout;
Poco::Timespan old_send_timeout;
Poco::Timespan old_receive_timeout;
};
bool Connection::ping() bool Connection::ping()
{ {
// LOG_TRACE(log_wrapper.get(), "Ping (" << getServerAddress() << ")"); // LOG_TRACE(log_wrapper.get(), "Ping (" << getServerAddress() << ")");
PingTimeoutSetter timeout_setter(socket, ping_timeout);
try try
{ {
UInt64 pong = 0; UInt64 pong = 0;

View File

@ -43,7 +43,7 @@ void CollapsingFinalBlockInputStream::fetchNextBlock(size_t input_index)
Block block = stream->read(); Block block = stream->read();
if (!block) if (!block)
return; return;
MergingBlockPtr merging_block(new MergingBlock(block, input_index, description, sign_column, &output_blocks)); MergingBlockPtr merging_block(new MergingBlock(block, input_index, description, sign_column_name, &output_blocks));
++blocks_fetched; ++blocks_fetched;
queue.push(Cursor(merging_block)); queue.push(Cursor(merging_block));
} }
@ -123,9 +123,7 @@ Block CollapsingFinalBlockInputStream::readImpl()
/// Все потоки кончились. Обработаем последний ключ. /// Все потоки кончились. Обработаем последний ключ.
if (!has_next) if (!has_next)
{
commitCurrent(); commitCurrent();
}
break; break;
} }
@ -136,7 +134,6 @@ Block CollapsingFinalBlockInputStream::readImpl()
if (has_next && !(next < current)) if (has_next && !(next < current))
{ {
queue.push(current); queue.push(current);
break; break;
} }
} }

View File

@ -226,7 +226,7 @@ void IProfilingBlockInputStream::progressImpl(const Progress & value)
size_t rows_processed = process_list_elem->progress.rows; size_t rows_processed = process_list_elem->progress.rows;
size_t bytes_processed = process_list_elem->progress.bytes; size_t bytes_processed = process_list_elem->progress.bytes;
size_t total_rows_estimate = std::max(process_list_elem->progress.rows, process_list_elem->progress.total_rows); size_t total_rows_estimate = std::max(rows_processed, process_list_elem->progress.total_rows);
/** Проверяем ограничения на объём данных для чтения, скорость выполнения запроса, квоту на объём данных для чтения. /** Проверяем ограничения на объём данных для чтения, скорость выполнения запроса, квоту на объём данных для чтения.
* NOTE: Может быть, имеет смысл сделать, чтобы они проверялись прямо в ProcessList? * NOTE: Может быть, имеет смысл сделать, чтобы они проверялись прямо в ProcessList?
@ -253,16 +253,32 @@ void IProfilingBlockInputStream::progressImpl(const Progress & value)
throw Exception("Logical error: unknown overflow mode", ErrorCodes::LOGICAL_ERROR); throw Exception("Logical error: unknown overflow mode", ErrorCodes::LOGICAL_ERROR);
} }
if (limits.min_execution_speed) size_t total_rows = process_list_elem->progress.total_rows;
if (limits.min_execution_speed || (total_rows && limits.timeout_before_checking_execution_speed != 0))
{ {
double total_elapsed = info.total_stopwatch.elapsedSeconds(); double total_elapsed = info.total_stopwatch.elapsedSeconds();
if (total_elapsed > limits.timeout_before_checking_execution_speed.totalMicroseconds() / 1000000.0 if (total_elapsed > limits.timeout_before_checking_execution_speed.totalMicroseconds() / 1000000.0)
&& rows_processed / total_elapsed < limits.min_execution_speed)
{ {
throw Exception("Query is executing too slow: " + toString(rows_processed / total_elapsed) if (limits.min_execution_speed && rows_processed / total_elapsed < limits.min_execution_speed)
+ " rows/sec., minimum: " + toString(limits.min_execution_speed), throw Exception("Query is executing too slow: " + toString(rows_processed / total_elapsed)
ErrorCodes::TOO_SLOW); + " rows/sec., minimum: " + toString(limits.min_execution_speed),
ErrorCodes::TOO_SLOW);
size_t total_rows = process_list_elem->progress.total_rows;
/// Если предсказанное время выполнения больше, чем max_execution_time.
if (limits.max_execution_time != 0 && total_rows)
{
double estimated_execution_time_seconds = total_elapsed * (static_cast<double>(total_rows) / rows_processed);
if (estimated_execution_time_seconds > limits.max_execution_time.totalSeconds())
throw Exception("Estimated query execution time (" + toString(estimated_execution_time_seconds) + " seconds)"
+ " is too long. Maximum: " + toString(limits.max_execution_time.totalSeconds())
+ ". Estimated rows to process: " + toString(total_rows),
ErrorCodes::TOO_SLOW);
}
} }
} }

View File

@ -23,16 +23,18 @@
int main(int argc, char ** argv) int main(int argc, char ** argv)
{ {
using namespace DB;
try try
{ {
size_t n = argc == 2 ? atoi(argv[1]) : 10; size_t n = argc == 2 ? atoi(argv[1]) : 10;
DB::Block block; Block block;
DB::ColumnWithNameAndType column_x; ColumnWithNameAndType column_x;
column_x.name = "x"; column_x.name = "x";
column_x.type = new DB::DataTypeInt16; column_x.type = new DataTypeInt16;
DB::ColumnInt16 * x = new DB::ColumnInt16; ColumnInt16 * x = new ColumnInt16;
column_x.column = x; column_x.column = x;
auto & vec_x = x->getData(); auto & vec_x = x->getData();
@ -44,65 +46,64 @@ int main(int argc, char ** argv)
const char * strings[] = {"abc", "def", "abcd", "defg", "ac"}; const char * strings[] = {"abc", "def", "abcd", "defg", "ac"};
DB::ColumnWithNameAndType column_s1; ColumnWithNameAndType column_s1;
column_s1.name = "s1"; column_s1.name = "s1";
column_s1.type = new DB::DataTypeString; column_s1.type = new DataTypeString;
column_s1.column = new DB::ColumnString; column_s1.column = new ColumnString;
for (size_t i = 0; i < n; ++i) for (size_t i = 0; i < n; ++i)
column_s1.column->insert(std::string(strings[i % 5])); column_s1.column->insert(std::string(strings[i % 5]));
block.insert(column_s1); block.insert(column_s1);
DB::ColumnWithNameAndType column_s2; ColumnWithNameAndType column_s2;
column_s2.name = "s2"; column_s2.name = "s2";
column_s2.type = new DB::DataTypeString; column_s2.type = new DataTypeString;
column_s2.column = new DB::ColumnString; column_s2.column = new ColumnString;
for (size_t i = 0; i < n; ++i) for (size_t i = 0; i < n; ++i)
column_s2.column->insert(std::string(strings[i % 3])); column_s2.column->insert(std::string(strings[i % 3]));
block.insert(column_s2); block.insert(column_s2);
DB::ColumnNumbers key_column_numbers; Names key_column_names;
key_column_numbers.push_back(0); key_column_names.emplace_back("x");
//key_column_numbers.push_back(1);
DB::AggregateFunctionFactory factory; AggregateFunctionFactory factory;
DB::AggregateDescriptions aggregate_descriptions(1); AggregateDescriptions aggregate_descriptions(1);
DB::DataTypes empty_list_of_types; DataTypes empty_list_of_types;
aggregate_descriptions[0].function = factory.get("count", empty_list_of_types); aggregate_descriptions[0].function = factory.get("count", empty_list_of_types);
Poco::SharedPtr<DB::DataTypes> result_types = new DB::DataTypes Poco::SharedPtr<DataTypes> result_types = new DataTypes
{ {
new DB::DataTypeInt16, new DataTypeInt16,
// new DB::DataTypeString, // new DataTypeString,
new DB::DataTypeUInt64, new DataTypeUInt64,
}; };
DB::Block sample; Block sample;
for (DB::DataTypes::const_iterator it = result_types->begin(); it != result_types->end(); ++it) for (DataTypes::const_iterator it = result_types->begin(); it != result_types->end(); ++it)
{ {
DB::ColumnWithNameAndType col; ColumnWithNameAndType col;
col.type = *it; col.type = *it;
sample.insert(col); sample.insert(col);
} }
DB::BlockInputStreamPtr stream = new DB::OneBlockInputStream(block); BlockInputStreamPtr stream = new OneBlockInputStream(block);
stream = new DB::AggregatingBlockInputStream(stream, key_column_numbers, aggregate_descriptions, false, true, stream = new AggregatingBlockInputStream(stream, key_column_names, aggregate_descriptions, false, true,
0, DB::OverflowMode::THROW, nullptr, 0, 0); 0, OverflowMode::THROW, nullptr, 0, 0);
DB::WriteBufferFromOStream ob(std::cout); WriteBufferFromOStream ob(std::cout);
DB::RowOutputStreamPtr row_out = new DB::TabSeparatedRowOutputStream(ob, sample); RowOutputStreamPtr row_out = new TabSeparatedRowOutputStream(ob, sample);
DB::BlockOutputStreamPtr out = new DB::BlockOutputStreamFromRowOutputStream(row_out); BlockOutputStreamPtr out = new BlockOutputStreamFromRowOutputStream(row_out);
{ {
Poco::Stopwatch stopwatch; Poco::Stopwatch stopwatch;
stopwatch.start(); stopwatch.start();
DB::copyData(*stream, *out); copyData(*stream, *out);
stopwatch.stop(); stopwatch.stop();
std::cout << std::fixed << std::setprecision(2) std::cout << std::fixed << std::setprecision(2)
@ -115,7 +116,7 @@ int main(int argc, char ** argv)
stream->dumpTree(std::cout); stream->dumpTree(std::cout);
std::cout << std::endl; std::cout << std::endl;
} }
catch (const DB::Exception & e) catch (const Exception & e)
{ {
std::cerr << e.displayText() << std::endl; std::cerr << e.displayText() << std::endl;
} }

View File

@ -0,0 +1,222 @@
#include <DB/IO/ReadBufferAIO.h>
#include <DB/Common/ProfileEvents.h>
#include <DB/Core/ErrorCodes.h>
#include <DB/Core/Defines.h>
#include <sys/types.h>
#include <sys/stat.h>
namespace DB
{
ReadBufferAIO::ReadBufferAIO(const std::string & filename_, size_t buffer_size_, int flags_, mode_t mode_,
char * existing_memory_)
: BufferWithOwnMemory(buffer_size_, existing_memory_, DEFAULT_AIO_FILE_BLOCK_SIZE),
fill_buffer(BufferWithOwnMemory(buffer_size_, nullptr, DEFAULT_AIO_FILE_BLOCK_SIZE)),
request_ptrs{ &request }, events(1), filename(filename_)
{
ProfileEvents::increment(ProfileEvents::FileOpen);
int open_flags = (flags_ == -1) ? O_RDONLY : flags_;
open_flags |= O_DIRECT;
fd = ::open(filename.c_str(), open_flags, mode_);
if (fd == -1)
{
got_exception = true;
auto error_code = (errno == ENOENT) ? ErrorCodes::FILE_DOESNT_EXIST : ErrorCodes::CANNOT_OPEN_FILE;
throwFromErrno("Cannot open file " + filename, error_code);
}
::memset(&request, 0, sizeof(request));
}
ReadBufferAIO::~ReadBufferAIO()
{
if (!got_exception)
{
try
{
waitForAIOCompletion();
}
catch (...)
{
tryLogCurrentException(__PRETTY_FUNCTION__);
}
}
if (fd != -1)
::close(fd);
}
void ReadBufferAIO::setMaxBytes(size_t max_bytes_read_)
{
if (is_started)
{
got_exception = true;
throw Exception("Illegal attempt to set the maximum number of bytes to read from file " + filename, ErrorCodes::LOGICAL_ERROR);
}
if ((max_bytes_read_ % DEFAULT_AIO_FILE_BLOCK_SIZE) != 0)
{
got_exception = true;
throw Exception("Invalid maximum number of bytes to read from file " + filename, ErrorCodes::AIO_UNALIGNED_SIZE_ERROR);
}
max_bytes_read = max_bytes_read_;
}
off_t ReadBufferAIO::seek(off_t off, int whence)
{
if ((off % DEFAULT_AIO_FILE_BLOCK_SIZE) != 0)
throw Exception("Invalid offset for ReadBufferAIO::seek", ErrorCodes::AIO_UNALIGNED_SIZE_ERROR);
waitForAIOCompletion();
off_t new_pos;
if (whence == SEEK_SET)
{
if (off < 0)
{
got_exception = true;
throw Exception("SEEK_SET underflow", ErrorCodes::ARGUMENT_OUT_OF_BOUND);
}
new_pos = off;
}
else if (whence == SEEK_CUR)
{
if (off >= 0)
{
if (off > (std::numeric_limits<off_t>::max() - getPositionInFileRelaxed()))
{
got_exception = true;
throw Exception("SEEK_CUR overflow", ErrorCodes::ARGUMENT_OUT_OF_BOUND);
}
}
else if (off < -getPositionInFileRelaxed())
{
got_exception = true;
throw Exception("SEEK_CUR underflow", ErrorCodes::ARGUMENT_OUT_OF_BOUND);
}
new_pos = getPositionInFileRelaxed() + off;
}
else
{
got_exception = true;
throw Exception("ReadBufferAIO::seek expects SEEK_SET or SEEK_CUR as whence", ErrorCodes::ARGUMENT_OUT_OF_BOUND);
}
if (new_pos != getPositionInFileRelaxed())
{
off_t working_buffer_begin_pos = pos_in_file - static_cast<off_t>(working_buffer.size());
if (hasPendingData() && (new_pos >= working_buffer_begin_pos) && (new_pos <= pos_in_file))
{
/// Свдинулись, но остались в пределах буфера.
pos = working_buffer.begin() + (new_pos - working_buffer_begin_pos);
}
else
{
pos = working_buffer.end();
pos_in_file = new_pos;
}
}
return new_pos;
}
off_t ReadBufferAIO::getPositionInFile()
{
return seek(0, SEEK_CUR);
}
off_t ReadBufferAIO::getPositionInFileRelaxed() const noexcept
{
return pos_in_file - (working_buffer.end() - pos);
}
bool ReadBufferAIO::nextImpl()
{
/// Если конец файла уже был достигнут при вызове этой функции,
/// то текущий вызов ошибочен.
if (is_eof)
return false;
waitForAIOCompletion();
/// При первом вызове не надо обменять местами основной и дублирующий буферы.
if (is_started)
swapBuffers();
else
is_started = true;
/// Если конец файла только что достигнут, больше ничего не делаем.
if (is_eof)
return true;
/// Создать запрос.
request.aio_lio_opcode = IOCB_CMD_PREAD;
request.aio_fildes = fd;
request.aio_buf = reinterpret_cast<UInt64>(fill_buffer.internalBuffer().begin());
request.aio_nbytes = std::min(fill_buffer.internalBuffer().size(), max_bytes_read);
request.aio_offset = pos_in_file;
request.aio_reqprio = 0;
/// Отправить запрос.
while (io_submit(aio_context.ctx, request_ptrs.size(), &request_ptrs[0]) < 0)
if (errno != EINTR)
{
got_exception = true;
throw Exception("Cannot submit request for asynchronous IO on file " + filename, ErrorCodes::AIO_SUBMIT_ERROR);
}
is_pending_read = true;
return true;
}
void ReadBufferAIO::waitForAIOCompletion()
{
if (is_pending_read)
{
while (io_getevents(aio_context.ctx, events.size(), events.size(), &events[0], nullptr) < 0)
if (errno != EINTR)
{
got_exception = true;
throw Exception("Failed to wait for asynchronous IO completion on file " + filename, ErrorCodes::AIO_COMPLETION_ERROR);
}
is_pending_read = false;
off_t bytes_read = events[0].res;
if (bytes_read < 0)
{
got_exception = true;
throw Exception("Asynchronous read error on file " + filename, ErrorCodes::AIO_READ_ERROR);
}
if ((bytes_read % DEFAULT_AIO_FILE_BLOCK_SIZE) != 0)
{
got_exception = true;
throw Exception("Received unaligned number of bytes from file " + filename, ErrorCodes::AIO_UNALIGNED_SIZE_ERROR);
}
if (pos_in_file > (std::numeric_limits<off_t>::max() - bytes_read))
{
got_exception = true;
throw Exception("File position overflowed", ErrorCodes::LOGICAL_ERROR);
}
pos_in_file += bytes_read;
total_bytes_read += bytes_read;
if (bytes_read > 0)
fill_buffer.buffer().resize(bytes_read);
if ((static_cast<size_t>(bytes_read) < fill_buffer.internalBuffer().size()) || (total_bytes_read == max_bytes_read))
is_eof = true;
}
}
void ReadBufferAIO::swapBuffers() noexcept
{
internalBuffer().swap(fill_buffer.internalBuffer());
buffer().swap(fill_buffer.buffer());
std::swap(position(), fill_buffer.position());
}
}

View File

@ -0,0 +1,202 @@
#include <DB/IO/WriteBufferAIO.h>
#include <DB/Common/ProfileEvents.h>
#include <DB/Core/ErrorCodes.h>
#include <DB/Core/Defines.h>
#include <limits>
#include <sys/types.h>
#include <sys/stat.h>
namespace DB
{
WriteBufferAIO::WriteBufferAIO(const std::string & filename_, size_t buffer_size_, int flags_, mode_t mode_,
char * existing_memory_)
: BufferWithOwnMemory(buffer_size_, existing_memory_, DEFAULT_AIO_FILE_BLOCK_SIZE),
flush_buffer(BufferWithOwnMemory(buffer_size_, nullptr, DEFAULT_AIO_FILE_BLOCK_SIZE)),
request_ptrs{ &request }, events(1), filename(filename_)
{
ProfileEvents::increment(ProfileEvents::FileOpen);
int open_flags = (flags_ == -1) ? (O_WRONLY | O_TRUNC | O_CREAT) : flags_;
open_flags |= O_DIRECT;
fd = ::open(filename.c_str(), open_flags, mode_);
if (fd == -1)
{
got_exception = true;
auto error_code = (errno == ENOENT) ? ErrorCodes::FILE_DOESNT_EXIST : ErrorCodes::CANNOT_OPEN_FILE;
throwFromErrno("Cannot open file " + filename, error_code);
}
::memset(&request, 0, sizeof(request));
}
WriteBufferAIO::~WriteBufferAIO()
{
if (!got_exception)
{
try
{
flush();
}
catch (...)
{
tryLogCurrentException(__PRETTY_FUNCTION__);
}
}
if (fd != -1)
::close(fd);
}
off_t WriteBufferAIO::seek(off_t off, int whence)
{
if ((off % DEFAULT_AIO_FILE_BLOCK_SIZE) != 0)
throw Exception("Invalid offset for WriteBufferAIO::seek", ErrorCodes::AIO_UNALIGNED_SIZE_ERROR);
flush();
if (whence == SEEK_SET)
{
if (off < 0)
{
got_exception = true;
throw Exception("SEEK_SET underflow", ErrorCodes::ARGUMENT_OUT_OF_BOUND);
}
pos_in_file = off;
}
else if (whence == SEEK_CUR)
{
if (off >= 0)
{
if (off > (std::numeric_limits<off_t>::max() - pos_in_file))
{
got_exception = true;
throw Exception("SEEK_CUR overflow", ErrorCodes::ARGUMENT_OUT_OF_BOUND);
}
}
else if (off < -pos_in_file)
{
got_exception = true;
throw Exception("SEEK_CUR underflow", ErrorCodes::ARGUMENT_OUT_OF_BOUND);
}
pos_in_file += off;
}
else
{
got_exception = true;
throw Exception("WriteBufferAIO::seek expects SEEK_SET or SEEK_CUR as whence", ErrorCodes::ARGUMENT_OUT_OF_BOUND);
}
return pos_in_file;
}
off_t WriteBufferAIO::getPositionInFile()
{
return seek(0, SEEK_CUR);
}
void WriteBufferAIO::truncate(off_t length)
{
if ((length % DEFAULT_AIO_FILE_BLOCK_SIZE) != 0)
throw Exception("Invalid length for WriteBufferAIO::ftruncate", ErrorCodes::AIO_UNALIGNED_SIZE_ERROR);
flush();
int res = ::ftruncate(fd, length);
if (res == -1)
{
got_exception = true;
throwFromErrno("Cannot truncate file " + filename, ErrorCodes::CANNOT_TRUNCATE_FILE);
}
}
void WriteBufferAIO::sync()
{
flush();
/// Попросим ОС сбросить данные на диск.
int res = ::fsync(fd);
if (res == -1)
{
got_exception = true;
throwFromErrno("Cannot fsync " + getFileName(), ErrorCodes::CANNOT_FSYNC);
}
}
void WriteBufferAIO::flush()
{
next();
waitForAIOCompletion();
}
void WriteBufferAIO::nextImpl()
{
if (!offset())
return;
waitForAIOCompletion();
swapBuffers();
/// Создать запрос.
request.aio_lio_opcode = IOCB_CMD_PWRITE;
request.aio_fildes = fd;
request.aio_buf = reinterpret_cast<UInt64>(flush_buffer.buffer().begin());
request.aio_nbytes = flush_buffer.offset();
request.aio_offset = pos_in_file;
request.aio_reqprio = 0;
if ((request.aio_nbytes % DEFAULT_AIO_FILE_BLOCK_SIZE) != 0)
{
got_exception = true;
throw Exception("Illegal attempt to write unaligned data to file " + filename, ErrorCodes::AIO_UNALIGNED_SIZE_ERROR);
}
/// Отправить запрос.
while (io_submit(aio_context.ctx, request_ptrs.size(), &request_ptrs[0]) < 0)
if (errno != EINTR)
{
got_exception = true;
throw Exception("Cannot submit request for asynchronous IO on file " + filename, ErrorCodes::AIO_SUBMIT_ERROR);
}
is_pending_write = true;
}
void WriteBufferAIO::waitForAIOCompletion()
{
if (is_pending_write)
{
while (io_getevents(aio_context.ctx, events.size(), events.size(), &events[0], nullptr) < 0)
if (errno != EINTR)
{
got_exception = true;
throw Exception("Failed to wait for asynchronous IO completion on file " + filename, ErrorCodes::AIO_COMPLETION_ERROR);
}
is_pending_write = false;
off_t bytes_written = events[0].res;
if ((bytes_written < 0) || (static_cast<size_t>(bytes_written) < flush_buffer.offset()))
{
got_exception = true;
throw Exception("Asynchronous write error on file " + filename, ErrorCodes::AIO_WRITE_ERROR);
}
if (pos_in_file > (std::numeric_limits<off_t>::max() - bytes_written))
{
got_exception = true;
throw Exception("File position overflowed", ErrorCodes::LOGICAL_ERROR);
}
pos_in_file += bytes_written;
}
}
void WriteBufferAIO::swapBuffers() noexcept
{
buffer().swap(flush_buffer.buffer());
std::swap(position(), flush_buffer.position());
}
}

View File

@ -0,0 +1,337 @@
#include <DB/IO/ReadBufferAIO.h>
#include <DB/Core/Defines.h>
#include <boost/filesystem.hpp>
#include <vector>
#include <iostream>
#include <fstream>
#include <functional>
#include <cstdlib>
#include <unistd.h>
namespace
{
void run();
void prepare(std::string & directory, std::string & filename, std::string & buf);
void die(const std::string & msg);
void run_test(unsigned int num, const std::function<bool()> func);
bool test1(const std::string & filename);
bool test2(const std::string & filename, const std::string & buf);
bool test3(const std::string & filename, const std::string & buf);
bool test4(const std::string & filename, const std::string & buf);
bool test5(const std::string & filename);
bool test6(const std::string & filename, const std::string & buf);
bool test7(const std::string & filename, const std::string & buf);
bool test8(const std::string & filename);
bool test9(const std::string & filename, const std::string & buf);
bool test10(const std::string & filename, const std::string & buf);
bool test11(const std::string & filename);
bool test12(const std::string & filename, const std::string & buf);
void run()
{
namespace fs = boost::filesystem;
std::string directory;
std::string filename;
std::string buf;
prepare(directory, filename, buf);
const std::vector<std::function<bool()> > tests =
{
std::bind(test1, std::ref(filename)),
std::bind(test2, std::ref(filename), std::ref(buf)),
std::bind(test3, std::ref(filename), std::ref(buf)),
std::bind(test4, std::ref(filename), std::ref(buf)),
std::bind(test5, std::ref(filename)),
std::bind(test6, std::ref(filename), std::ref(buf)),
std::bind(test7, std::ref(filename), std::ref(buf)),
std::bind(test8, std::ref(filename)),
std::bind(test9, std::ref(filename), std::ref(buf)),
std::bind(test10, std::ref(filename), std::ref(buf)),
std::bind(test11, std::ref(filename)),
std::bind(test12, std::ref(filename), std::ref(buf))
};
unsigned int num = 0;
for (const auto & test : tests)
{
++num;
run_test(num, test);
}
fs::remove_all(directory);
}
void prepare(std::string & directory, std::string & filename, std::string & buf)
{
static const std::string symbols = "ABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789";
char pattern[] = "/tmp/fileXXXXXX";
char * dir = ::mkdtemp(pattern);
if (dir == nullptr)
die("Could not create directory");
directory = std::string(dir);
filename = directory + "/foo";
size_t n = 10 * DEFAULT_AIO_FILE_BLOCK_SIZE;
buf.reserve(n);
for (size_t i = 0; i < n; ++i)
buf += symbols[i % symbols.length()];
std::ofstream out(filename.c_str());
if (!out.is_open())
die("Could not open file");
out << buf;
}
void die(const std::string & msg)
{
std::cout << msg << "\n";
::exit(EXIT_FAILURE);
}
void run_test(unsigned int num, const std::function<bool()> func)
{
bool ok;
try
{
ok = func();
}
catch (const DB::Exception & ex)
{
ok = false;
std::cout << "Caught exception " << ex.displayText() << "\n";
}
catch (const std::exception & ex)
{
ok = false;
std::cout << "Caught exception " << ex.what() << "\n";
}
if (ok)
std::cout << "Test " << num << " passed\n";
else
std::cout << "Test " << num << " failed\n";
}
bool test1(const std::string & filename)
{
DB::ReadBufferAIO in(filename, 3 * DEFAULT_AIO_FILE_BLOCK_SIZE);
if (in.getFileName() != filename)
return false;
if (in.getFD() == -1)
return false;
return true;
}
bool test2(const std::string & filename, const std::string & buf)
{
std::string newbuf;
newbuf.resize(buf.length());
DB::ReadBufferAIO in(filename, 3 * DEFAULT_AIO_FILE_BLOCK_SIZE);
size_t count = in.read(&newbuf[0], newbuf.length());
if (count != newbuf.length())
return false;
return (newbuf == buf);
}
bool test3(const std::string & filename, const std::string & buf)
{
std::string newbuf;
newbuf.resize(buf.length());
size_t requested = 9 * DEFAULT_AIO_FILE_BLOCK_SIZE;
DB::ReadBufferAIO in(filename, 3 * DEFAULT_AIO_FILE_BLOCK_SIZE);
in.setMaxBytes(requested);
size_t count = in.read(&newbuf[0], newbuf.length());
newbuf.resize(count);
return (newbuf == buf.substr(0, requested));
}
bool test4(const std::string & filename, const std::string & buf)
{
std::string newbuf;
newbuf.resize(buf.length());
DB::ReadBufferAIO in(filename, 3 * DEFAULT_AIO_FILE_BLOCK_SIZE);
in.setMaxBytes(0);
size_t n_read = in.read(&newbuf[0], newbuf.length());
return n_read == 0;
}
bool test5(const std::string & filename)
{
bool ok = false;
try
{
DB::ReadBufferAIO in(filename, 3 * DEFAULT_AIO_FILE_BLOCK_SIZE);
in.setMaxBytes(DEFAULT_AIO_FILE_BLOCK_SIZE >> 1);
}
catch (const DB::Exception &)
{
ok = true;
}
return ok;
}
bool test6(const std::string & filename, const std::string & buf)
{
std::string newbuf;
newbuf.resize(buf.length());
DB::ReadBufferAIO in(filename, 3 * DEFAULT_AIO_FILE_BLOCK_SIZE);
if (in.getPositionInFile() != 0)
return false;
size_t count = in.read(&newbuf[0], newbuf.length());
if (count != newbuf.length())
return false;
if (static_cast<size_t>(in.getPositionInFile()) != buf.length())
return false;
return true;
}
bool test7(const std::string & filename, const std::string & buf)
{
std::string newbuf;
newbuf.resize(buf.length() - DEFAULT_AIO_FILE_BLOCK_SIZE);
DB::ReadBufferAIO in(filename, 3 * DEFAULT_AIO_FILE_BLOCK_SIZE);
(void) in.seek(DEFAULT_AIO_FILE_BLOCK_SIZE, SEEK_SET);
size_t count = in.read(&newbuf[0], newbuf.length());
if (count != (9 * DEFAULT_AIO_FILE_BLOCK_SIZE))
return false;
return (newbuf == buf.substr(DEFAULT_AIO_FILE_BLOCK_SIZE));
}
bool test8(const std::string & filename)
{
bool ok = false;
try
{
DB::ReadBufferAIO in(filename, 3 * DEFAULT_AIO_FILE_BLOCK_SIZE);
(void) in.seek(DEFAULT_AIO_FILE_BLOCK_SIZE + 1, SEEK_CUR);
}
catch (const DB::Exception &)
{
ok = true;
}
return ok;
}
bool test9(const std::string & filename, const std::string & buf)
{
bool ok = false;
try
{
std::string newbuf;
newbuf.resize(buf.length());
DB::ReadBufferAIO in(filename, 3 * DEFAULT_AIO_FILE_BLOCK_SIZE);
size_t count = in.read(&newbuf[0], newbuf.length());
if (count != newbuf.length())
return false;
in.setMaxBytes(9 * DEFAULT_AIO_FILE_BLOCK_SIZE);
}
catch (const DB::Exception &)
{
ok = true;
}
return ok;
}
bool test10(const std::string & filename, const std::string & buf)
{
std::string newbuf;
newbuf.resize(4 * DEFAULT_AIO_FILE_BLOCK_SIZE);
DB::ReadBufferAIO in(filename, 3 * DEFAULT_AIO_FILE_BLOCK_SIZE);
size_t count1 = in.read(&newbuf[0], newbuf.length());
if (count1 != newbuf.length())
return false;
if (newbuf != buf.substr(0, 4 * DEFAULT_AIO_FILE_BLOCK_SIZE))
return false;
(void) in.seek(2 * DEFAULT_AIO_FILE_BLOCK_SIZE, SEEK_CUR);
size_t count2 = in.read(&newbuf[0], newbuf.length());
if (count2 != newbuf.length())
return false;
if (newbuf != buf.substr(6 * DEFAULT_AIO_FILE_BLOCK_SIZE))
return false;
return true;
}
bool test11(const std::string & filename)
{
bool ok = false;
try
{
DB::ReadBufferAIO in(filename, 3 * DEFAULT_AIO_FILE_BLOCK_SIZE);
(void) in.seek(-DEFAULT_AIO_FILE_BLOCK_SIZE, SEEK_SET);
}
catch (const DB::Exception &)
{
ok = true;
}
return ok;
}
bool test12(const std::string & filename, const std::string & buf)
{
bool ok = false;
try
{
std::string newbuf;
newbuf.resize(4 * DEFAULT_AIO_FILE_BLOCK_SIZE);
DB::ReadBufferAIO in(filename, 3 * DEFAULT_AIO_FILE_BLOCK_SIZE);
size_t count = in.read(&newbuf[0], newbuf.length());
if (count != newbuf.length())
return false;
(void) in.seek(-(10 * DEFAULT_AIO_FILE_BLOCK_SIZE), SEEK_CUR);
}
catch (const DB::Exception &)
{
ok = true;
}
return ok;
}
}
int main()
{
run();
return 0;
}

View File

@ -0,0 +1,291 @@
#include <DB/IO/WriteBufferAIO.h>
#include <DB/Core/Defines.h>
#include <boost/filesystem.hpp>
#include <iostream>
#include <fstream>
#include <streambuf>
#include <cstdlib>
namespace
{
void run();
void die(const std::string & msg);
void run_test(unsigned int num, const std::function<bool()> func);
bool test1();
bool test2();
bool test3();
bool test4();
void run()
{
const std::vector<std::function<bool()> > tests =
{
test1,
test2,
test3,
test4
};
unsigned int num = 0;
for (const auto & test : tests)
{
++num;
run_test(num, test);
}
}
void die(const std::string & msg)
{
std::cout << msg;
::exit(EXIT_FAILURE);
}
void run_test(unsigned int num, const std::function<bool()> func)
{
bool ok;
try
{
ok = func();
}
catch (const DB::Exception & ex)
{
ok = false;
std::cout << "Caught exception " << ex.displayText() << "\n";
}
catch (const std::exception & ex)
{
ok = false;
std::cout << "Caught exception " << ex.what() << "\n";
}
if (ok)
std::cout << "Test " << num << " passed\n";
else
std::cout << "Test " << num << " failed\n";
}
bool test1()
{
namespace fs = boost::filesystem;
static const std::string symbols = "ABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789";
char pattern[] = "/tmp/fileXXXXXX";
char * dir = ::mkdtemp(pattern);
if (dir == nullptr)
die("Could not create directory");
const std::string directory = std::string(dir);
const std::string filename = directory + "/foo";
size_t n = 10 * DEFAULT_AIO_FILE_BLOCK_SIZE;
std::string buf;
buf.reserve(n);
for (size_t i = 0; i < n; ++i)
buf += symbols[i % symbols.length()];
{
DB::WriteBufferAIO out(filename, 3 * DEFAULT_AIO_FILE_BLOCK_SIZE);
if (out.getFileName() != filename)
return false;
if (out.getFD() == -1)
return false;
out.write(&buf[0], buf.length());
}
std::ifstream in(filename.c_str());
if (!in.is_open())
die("Could not open file");
std::string received{ std::istreambuf_iterator<char>(in), std::istreambuf_iterator<char>() };
in.close();
fs::remove_all(directory);
return (received == buf);
}
bool test2()
{
namespace fs = boost::filesystem;
static const std::string symbols = "ABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789";
char pattern[] = "/tmp/fileXXXXXX";
char * dir = ::mkdtemp(pattern);
if (dir == nullptr)
die("Could not create directory");
const std::string directory = std::string(dir);
const std::string filename = directory + "/foo";
size_t n = 10 * DEFAULT_AIO_FILE_BLOCK_SIZE;
std::string buf;
buf.reserve(n);
for (size_t i = 0; i < n; ++i)
buf += symbols[i % symbols.length()];
{
DB::WriteBufferAIO out(filename, 3 * DEFAULT_AIO_FILE_BLOCK_SIZE);
if (out.getFileName() != filename)
return false;
if (out.getFD() == -1)
return false;
out.write(&buf[0], buf.length() / 2);
out.seek(DEFAULT_AIO_FILE_BLOCK_SIZE, SEEK_CUR);
out.write(&buf[buf.length() / 2], buf.length() / 2);
}
std::ifstream in(filename.c_str());
if (!in.is_open())
die("Could not open file");
std::string received{ std::istreambuf_iterator<char>(in), std::istreambuf_iterator<char>() };
in.close();
fs::remove_all(directory);
if (received.substr(0, buf.length() / 2) != buf.substr(0, buf.length() / 2))
return false;
if (received.substr(buf.length() / 2, DEFAULT_AIO_FILE_BLOCK_SIZE) != std::string(DEFAULT_AIO_FILE_BLOCK_SIZE, '\0'))
return false;
if (received.substr(buf.length() / 2 + DEFAULT_AIO_FILE_BLOCK_SIZE) != buf.substr(buf.length() / 2))
return false;
return true;
}
bool test3()
{
namespace fs = boost::filesystem;
static const std::string symbols = "ABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789";
char pattern[] = "/tmp/fileXXXXXX";
char * dir = ::mkdtemp(pattern);
if (dir == nullptr)
die("Could not create directory");
const std::string directory = std::string(dir);
const std::string filename = directory + "/foo";
size_t n = 10 * DEFAULT_AIO_FILE_BLOCK_SIZE;
std::string buf;
buf.reserve(n);
for (size_t i = 0; i < n; ++i)
buf += symbols[i % symbols.length()];
{
DB::WriteBufferAIO out(filename, 3 * DEFAULT_AIO_FILE_BLOCK_SIZE);
if (out.getFileName() != filename)
return false;
if (out.getFD() == -1)
return false;
out.write(&buf[0], buf.length());
off_t pos1 = out.getPositionInFile();
out.truncate(buf.length() / 2);
off_t pos2 = out.getPositionInFile();
if (pos1 != pos2)
return false;
}
std::ifstream in(filename.c_str());
if (!in.is_open())
die("Could not open file");
std::string received{ std::istreambuf_iterator<char>(in), std::istreambuf_iterator<char>() };
in.close();
fs::remove_all(directory);
return (received == buf.substr(0, buf.length() / 2));
}
bool test4()
{
namespace fs = boost::filesystem;
static const std::string symbols = "ABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789";
char pattern[] = "/tmp/fileXXXXXX";
char * dir = ::mkdtemp(pattern);
if (dir == nullptr)
die("Could not create directory");
const std::string directory = std::string(dir);
const std::string filename = directory + "/foo";
size_t n = 10 * DEFAULT_AIO_FILE_BLOCK_SIZE;
std::string buf;
buf.reserve(n);
for (size_t i = 0; i < n; ++i)
buf += symbols[i % symbols.length()];
{
DB::WriteBufferAIO out(filename, 3 * DEFAULT_AIO_FILE_BLOCK_SIZE);
if (out.getFileName() != filename)
return false;
if (out.getFD() == -1)
return false;
out.write(&buf[0], buf.length());
off_t pos1 = out.getPositionInFile();
out.truncate(3 * buf.length() / 2);
off_t pos2 = out.getPositionInFile();
if (pos1 != pos2)
return false;
}
std::ifstream in(filename.c_str());
if (!in.is_open())
die("Could not open file");
std::string received{ std::istreambuf_iterator<char>(in), std::istreambuf_iterator<char>() };
in.close();
fs::remove_all(directory);
if (received.substr(0, buf.length()) != buf)
return false;
if (received.substr(buf.length()) != std::string(buf.length() / 2, '\0'))
return false;
return true;
}
}
int main()
{
run();
return 0;
}

View File

@ -984,17 +984,28 @@ BlocksList Aggregator::prepareBlocksAndFillTwoLevelImpl(
std::vector<std::packaged_task<Block()>> tasks; std::vector<std::packaged_task<Block()>> tasks;
tasks.reserve(Method::Data::NUM_BUCKETS); tasks.reserve(Method::Data::NUM_BUCKETS);
for (size_t bucket = 0; bucket < Method::Data::NUM_BUCKETS; ++bucket) try
{ {
if (method.data.impls[bucket].empty()) for (size_t bucket = 0; bucket < Method::Data::NUM_BUCKETS; ++bucket)
continue; {
if (method.data.impls[bucket].empty())
continue;
tasks.emplace_back(std::bind(converter, bucket, current_memory_tracker)); tasks.emplace_back(std::bind(converter, bucket, current_memory_tracker));
if (thread_pool)
thread_pool->schedule([bucket, &tasks] { tasks[bucket](); });
else
tasks[bucket]();
}
}
catch (...)
{
/// Если этого не делать, то в случае исключения, tasks уничтожится раньше завершения потоков, и будет плохо.
if (thread_pool) if (thread_pool)
thread_pool->schedule([bucket, &tasks] { tasks[bucket](); }); thread_pool->wait();
else
tasks[bucket](); throw;
} }
if (thread_pool) if (thread_pool)
@ -1112,13 +1123,13 @@ void NO_INLINE Aggregator::mergeDataImpl(
for (size_t i = 0; i < aggregates_size; ++i) for (size_t i = 0; i < aggregates_size; ++i)
aggregate_functions[i]->destroy( aggregate_functions[i]->destroy(
Method::getAggregateData(it->second) + offsets_of_aggregate_states[i]); Method::getAggregateData(it->second) + offsets_of_aggregate_states[i]);
Method::getAggregateData(it->second) = nullptr;
} }
else else
{ {
res_it->second = it->second; res_it->second = it->second;
} }
Method::getAggregateData(it->second) = nullptr;
} }
} }
@ -1197,14 +1208,25 @@ void NO_INLINE Aggregator::mergeTwoLevelDataImpl(
std::vector<std::packaged_task<void()>> tasks; std::vector<std::packaged_task<void()>> tasks;
tasks.reserve(Method::Data::NUM_BUCKETS); tasks.reserve(Method::Data::NUM_BUCKETS);
for (size_t bucket = 0; bucket < Method::Data::NUM_BUCKETS; ++bucket) try
{ {
tasks.emplace_back(std::bind(merge_bucket, bucket, current_memory_tracker)); for (size_t bucket = 0; bucket < Method::Data::NUM_BUCKETS; ++bucket)
{
tasks.emplace_back(std::bind(merge_bucket, bucket, current_memory_tracker));
if (thread_pool)
thread_pool->schedule([bucket, &tasks] { tasks[bucket](); });
else
tasks[bucket]();
}
}
catch (...)
{
/// Если этого не делать, то в случае исключения, tasks уничтожится раньше завершения потоков, и будет плохо.
if (thread_pool) if (thread_pool)
thread_pool->schedule([bucket, &tasks] { tasks[bucket](); }); thread_pool->wait();
else
tasks[bucket](); throw;
} }
if (thread_pool) if (thread_pool)
@ -1492,11 +1514,6 @@ void Aggregator::mergeStream(BlockInputStreamPtr stream, AggregatedDataVariants
{ {
LOG_TRACE(log, "Merging partially aggregated two-level data."); LOG_TRACE(log, "Merging partially aggregated two-level data.");
std::unique_ptr<boost::threadpool::pool> thread_pool;
if (max_threads > 1 && total_input_rows > 100000 /// TODO Сделать настраиваемый порог.
&& has_two_level)
thread_pool.reset(new boost::threadpool::pool(max_threads));
auto merge_bucket = [&bucket_to_blocks, &result, this](Int32 bucket, Arena * aggregates_pool, MemoryTracker * memory_tracker) auto merge_bucket = [&bucket_to_blocks, &result, this](Int32 bucket, Arena * aggregates_pool, MemoryTracker * memory_tracker)
{ {
current_memory_tracker = memory_tracker; current_memory_tracker = memory_tracker;
@ -1520,6 +1537,11 @@ void Aggregator::mergeStream(BlockInputStreamPtr stream, AggregatedDataVariants
std::vector<std::packaged_task<void()>> tasks; std::vector<std::packaged_task<void()>> tasks;
tasks.reserve(bucket_to_blocks.size() - has_blocks_with_unknown_bucket); tasks.reserve(bucket_to_blocks.size() - has_blocks_with_unknown_bucket);
std::unique_ptr<boost::threadpool::pool> thread_pool;
if (max_threads > 1 && total_input_rows > 100000 /// TODO Сделать настраиваемый порог.
&& has_two_level)
thread_pool.reset(new boost::threadpool::pool(max_threads));
for (auto & bucket_blocks : bucket_to_blocks) for (auto & bucket_blocks : bucket_to_blocks)
{ {
auto bucket = bucket_blocks.first; auto bucket = bucket_blocks.first;

View File

@ -85,6 +85,9 @@ void ExpressionAnalyzer::init()
/// GROUP BY injective function elimination. /// GROUP BY injective function elimination.
optimizeGroupBy(); optimizeGroupBy();
/// Удалить из ORDER BY повторяющиеся элементы.
optimizeOrderBy();
/// array_join_alias_to_name, array_join_result_to_source. /// array_join_alias_to_name, array_join_result_to_source.
getArrayJoinedColumns(); getArrayJoinedColumns();
@ -162,11 +165,13 @@ void ExpressionAnalyzer::analyzeAggregation()
} }
NameAndTypePair key{column_name, col.type}; NameAndTypePair key{column_name, col.type};
aggregation_keys.push_back(key);
/// Ключи агрегации уникализируются.
if (!unique_keys.count(key.name)) if (!unique_keys.count(key.name))
{ {
unique_keys.insert(key.name); unique_keys.insert(key.name);
aggregation_keys.push_back(key);
/// key is no longer needed, therefore we can save a little by moving it /// key is no longer needed, therefore we can save a little by moving it
aggregated_columns.push_back(std::move(key)); aggregated_columns.push_back(std::move(key));
} }
@ -529,6 +534,38 @@ void ExpressionAnalyzer::optimizeGroupBy()
} }
void ExpressionAnalyzer::optimizeOrderBy()
{
if (!(select_query && select_query->order_expression_list))
return;
/// Уникализируем условия сортировки.
using NameAndLocale = std::pair<std::string, std::string>;
std::set<NameAndLocale> elems_set;
ASTs & elems = select_query->order_expression_list->children;
ASTs unique_elems;
unique_elems.reserve(elems.size());
for (const auto & elem : elems)
{
String name = elem->children.front()->getColumnName();
const ASTOrderByElement & order_by_elem = typeid_cast<const ASTOrderByElement &>(*elem);
if (elems_set.emplace(
std::piecewise_construct,
std::forward_as_tuple(name),
std::forward_as_tuple(order_by_elem.collator ? order_by_elem.collator->getLocale() : std::string())).second)
{
unique_elems.emplace_back(elem);
}
}
if (unique_elems.size() < elems.size())
elems = unique_elems;
}
void ExpressionAnalyzer::makeSetsForIndex() void ExpressionAnalyzer::makeSetsForIndex()
{ {
if (storage && ast && storage->supportsIndexForIn()) if (storage && ast && storage->supportsIndexForIn())
@ -1306,6 +1343,7 @@ void ExpressionAnalyzer::getAggregates(ASTPtr ast, ExpressionActionsPtr & action
AggregateDescription aggregate; AggregateDescription aggregate;
aggregate.column_name = node->getColumnName(); aggregate.column_name = node->getColumnName();
/// Агрегатные функции уникализируются.
for (size_t i = 0; i < aggregate_descriptions.size(); ++i) for (size_t i = 0; i < aggregate_descriptions.size(); ++i)
if (aggregate_descriptions[i].column_name == aggregate.column_name) if (aggregate_descriptions[i].column_name == aggregate.column_name)
return; return;
@ -1711,7 +1749,8 @@ ExpressionActionsPtr ExpressionAnalyzer::getConstActions()
void ExpressionAnalyzer::getAggregateInfo(Names & key_names, AggregateDescriptions & aggregates) void ExpressionAnalyzer::getAggregateInfo(Names & key_names, AggregateDescriptions & aggregates)
{ {
for (NamesAndTypesList::iterator it = aggregation_keys.begin(); it != aggregation_keys.end(); ++it) for (NamesAndTypesList::iterator it = aggregation_keys.begin(); it != aggregation_keys.end(); ++it)
key_names.push_back(it->name); key_names.emplace_back(it->name);
aggregates = aggregate_descriptions; aggregates = aggregate_descriptions;
} }

View File

@ -664,8 +664,15 @@ QueryProcessingStage::Enum InterpreterSelectQuery::executeFetchColumns(BlockInpu
* то в качестве размера блока будем использовать limit + offset (чтобы не читать из таблицы больше, чем запрошено), * то в качестве размера блока будем использовать limit + offset (чтобы не читать из таблицы больше, чем запрошено),
* а также установим количество потоков в 1 и отменим асинхронное выполнение конвейера запроса. * а также установим количество потоков в 1 и отменим асинхронное выполнение конвейера запроса.
*/ */
if (!query.distinct && !query.prewhere_expression && !query.where_expression && !query.group_expression_list && !query.having_expression && !query.order_expression_list if (!query.distinct
&& query.limit_length && !query_analyzer->hasAggregation() && limit_length + limit_offset < settings.max_block_size) && !query.prewhere_expression
&& !query.where_expression
&& !query.group_expression_list
&& !query.having_expression
&& !query.order_expression_list
&& query.limit_length
&& !query_analyzer->hasAggregation()
&& limit_length + limit_offset < settings.max_block_size)
{ {
settings.max_block_size = limit_length + limit_offset; settings.max_block_size = limit_length + limit_offset;
settings.max_threads = 1; settings.max_threads = 1;
@ -678,12 +685,6 @@ QueryProcessingStage::Enum InterpreterSelectQuery::executeFetchColumns(BlockInpu
/// Инициализируем изначальные потоки данных, на которые накладываются преобразования запроса. Таблица или подзапрос? /// Инициализируем изначальные потоки данных, на которые накладываются преобразования запроса. Таблица или подзапрос?
if (!interpreter_subquery) if (!interpreter_subquery)
{ {
/** При распределённой обработке запроса, на все удалённые серверы отправляются временные таблицы,
* полученные из глобальных подзапросов - GLOBAL IN/JOIN.
*/
if (storage && storage->isRemote())
storage->storeExternalTables(query_analyzer->getExternalTables());
streams = storage->read(required_columns, query_ptr, streams = storage->read(required_columns, query_ptr,
context, settings_for_storage, from_stage, context, settings_for_storage, from_stage,
settings.max_block_size, settings.max_threads); settings.max_block_size, settings.max_threads);
@ -817,12 +818,10 @@ static SortDescription getSortDescription(ASTSelectQuery & query)
{ {
SortDescription order_descr; SortDescription order_descr;
order_descr.reserve(query.order_expression_list->children.size()); order_descr.reserve(query.order_expression_list->children.size());
for (ASTs::iterator it = query.order_expression_list->children.begin(); for (const auto & elem : query.order_expression_list->children)
it != query.order_expression_list->children.end();
++it)
{ {
String name = (*it)->children.front()->getColumnName(); String name = elem->children.front()->getColumnName();
const ASTOrderByElement & order_by_elem = typeid_cast<const ASTOrderByElement &>(**it); const ASTOrderByElement & order_by_elem = typeid_cast<const ASTOrderByElement &>(*elem);
order_descr.emplace_back(name, order_by_elem.direction, order_by_elem.collator); order_descr.emplace_back(name, order_by_elem.direction, order_by_elem.collator);
} }

View File

@ -170,11 +170,20 @@ bool Set::insertFromBlock(const Block & block, bool create_ordered_set)
ConstColumnPlainPtrs key_columns(keys_size); ConstColumnPlainPtrs key_columns(keys_size);
data_types.resize(keys_size); data_types.resize(keys_size);
/// Константные столбцы справа от IN поддерживается не напрямую. Для этого, они сначала материализуется.
Columns materialized_columns;
/// Запоминаем столбцы, с которыми будем работать /// Запоминаем столбцы, с которыми будем работать
for (size_t i = 0; i < keys_size; ++i) for (size_t i = 0; i < keys_size; ++i)
{ {
key_columns[i] = block.getByPosition(i).column; key_columns[i] = block.getByPosition(i).column;
data_types[i] = block.getByPosition(i).type; data_types[i] = block.getByPosition(i).type;
if (key_columns[i]->isConst())
{
materialized_columns.emplace_back(static_cast<const IColumnConst *>(key_columns[i])->convertToFullColumn());
key_columns[i] = materialized_columns.back().get();
}
} }
size_t rows = block.rows(); size_t rows = block.rows();

View File

@ -62,9 +62,9 @@ int main(int argc, char ** argv)
DB::BlockInputStreamPtr stream = new DB::OneBlockInputStream(block); DB::BlockInputStreamPtr stream = new DB::OneBlockInputStream(block);
DB::AggregatedDataVariants aggregated_data_variants; DB::AggregatedDataVariants aggregated_data_variants;
DB::ColumnNumbers key_column_numbers; DB::Names key_column_names;
key_column_numbers.push_back(0); key_column_names.emplace_back("x");
key_column_numbers.push_back(1); key_column_names.emplace_back("s1");
DB::AggregateFunctionFactory factory; DB::AggregateFunctionFactory factory;
@ -73,7 +73,7 @@ int main(int argc, char ** argv)
DB::DataTypes empty_list_of_types; DB::DataTypes empty_list_of_types;
aggregate_descriptions[0].function = factory.get("count", empty_list_of_types); aggregate_descriptions[0].function = factory.get("count", empty_list_of_types);
DB::Aggregator aggregator(key_column_numbers, aggregate_descriptions, false, 0, DB::OverflowMode::THROW, nullptr, 0, 0); DB::Aggregator aggregator(key_column_names, aggregate_descriptions, false, 0, DB::OverflowMode::THROW, nullptr, 0, 0);
{ {
Poco::Stopwatch stopwatch; Poco::Stopwatch stopwatch;

View File

@ -247,8 +247,8 @@ void HTTPHandler::handleRequest(Poco::Net::HTTPServerRequest & request, Poco::Ne
catch (Exception & e) catch (Exception & e)
{ {
std::stringstream s; std::stringstream s;
s << "Code: " << e.code() s << "Code: " << e.code() << ", e.displayText() = " << e.displayText() << ", e.what() = " << e.what()
<< ", e.displayText() = " << e.displayText() << ", e.what() = " << e.what(); << ", Stack trace:\n\n" << e.getStackTrace().toString();
LOG_ERROR(log, s.str()); LOG_ERROR(log, s.str());
trySendExceptionToClient(s, request, response, used_output); trySendExceptionToClient(s, request, response, used_output);
} }
@ -257,18 +257,21 @@ void HTTPHandler::handleRequest(Poco::Net::HTTPServerRequest & request, Poco::Ne
std::stringstream s; std::stringstream s;
s << "Code: " << ErrorCodes::POCO_EXCEPTION << ", e.code() = " << e.code() s << "Code: " << ErrorCodes::POCO_EXCEPTION << ", e.code() = " << e.code()
<< ", e.displayText() = " << e.displayText() << ", e.what() = " << e.what(); << ", e.displayText() = " << e.displayText() << ", e.what() = " << e.what();
LOG_ERROR(log, s.str());
trySendExceptionToClient(s, request, response, used_output); trySendExceptionToClient(s, request, response, used_output);
} }
catch (std::exception & e) catch (std::exception & e)
{ {
std::stringstream s; std::stringstream s;
s << "Code: " << ErrorCodes::STD_EXCEPTION << ". " << e.what(); s << "Code: " << ErrorCodes::STD_EXCEPTION << ". " << e.what();
LOG_ERROR(log, s.str());
trySendExceptionToClient(s, request, response, used_output); trySendExceptionToClient(s, request, response, used_output);
} }
catch (...) catch (...)
{ {
std::stringstream s; std::stringstream s;
s << "Code: " << ErrorCodes::UNKNOWN_EXCEPTION << ". Unknown exception."; s << "Code: " << ErrorCodes::UNKNOWN_EXCEPTION << ". Unknown exception.";
LOG_ERROR(log, s.str());
trySendExceptionToClient(s, request, response, used_output); trySendExceptionToClient(s, request, response, used_output);
} }
} }

View File

@ -716,7 +716,7 @@ void TCPHandler::run()
} }
catch (Exception & e) catch (Exception & e)
{ {
LOG_ERROR(log, "Code: " << e.code() << ", e.displayText() = " << e.displayText() LOG_ERROR(log, "Code: " << e.code() << ", e.displayText() = " << e.displayText() << ", e.what() = " << e.what()
<< ", Stack trace:\n\n" << e.getStackTrace().toString()); << ", Stack trace:\n\n" << e.getStackTrace().toString());
} }
catch (Poco::Exception & e) catch (Poco::Exception & e)

View File

@ -44,7 +44,7 @@ MergeTreeData::MergeTreeData(
date_column_name(date_column_name_), sampling_expression(sampling_expression_), date_column_name(date_column_name_), sampling_expression(sampling_expression_),
index_granularity(index_granularity_), index_granularity(index_granularity_),
mode(mode_), sign_column(sign_column_), columns_to_sum(columns_to_sum_), mode(mode_), sign_column(sign_column_), columns_to_sum(columns_to_sum_),
settings(settings_), primary_expr_ast(primary_expr_ast_->clone()), settings(settings_), primary_expr_ast(primary_expr_ast_ ? primary_expr_ast_->clone() : nullptr),
require_part_metadata(require_part_metadata_), require_part_metadata(require_part_metadata_),
full_path(full_path_), columns(columns_), full_path(full_path_), columns(columns_),
broken_part_callback(broken_part_callback_), broken_part_callback(broken_part_callback_),
@ -89,18 +89,23 @@ MergeTreeData::MergeTreeData(
Poco::File(full_path).createDirectories(); Poco::File(full_path).createDirectories();
Poco::File(full_path + "detached").createDirectory(); Poco::File(full_path + "detached").createDirectory();
/// инициализируем описание сортировки if (primary_expr_ast)
sort_descr.reserve(primary_expr_ast->children.size());
for (const ASTPtr & ast : primary_expr_ast->children)
{ {
String name = ast->getColumnName(); /// инициализируем описание сортировки
sort_descr.push_back(SortColumnDescription(name, 1)); sort_descr.reserve(primary_expr_ast->children.size());
for (const ASTPtr & ast : primary_expr_ast->children)
{
String name = ast->getColumnName();
sort_descr.push_back(SortColumnDescription(name, 1));
}
primary_expr = ExpressionAnalyzer(primary_expr_ast, context, getColumnsList()).getActions(false);
ExpressionActionsPtr projected_expr = ExpressionAnalyzer(primary_expr_ast, context, getColumnsList()).getActions(true);
primary_key_sample = projected_expr->getSampleBlock();
} }
else if (mode != Unsorted)
primary_expr = ExpressionAnalyzer(primary_expr_ast, context, getColumnsList()).getActions(false); throw Exception("Primary key could be empty only for UnsortedMergeTree", ErrorCodes::BAD_ARGUMENTS);
ExpressionActionsPtr projected_expr = ExpressionAnalyzer(primary_expr_ast, context, getColumnsList()).getActions(true);
primary_key_sample = projected_expr->getSampleBlock();
} }
UInt64 MergeTreeData::getMaxDataPartIndex() UInt64 MergeTreeData::getMaxDataPartIndex()
@ -120,6 +125,7 @@ std::string MergeTreeData::getModePrefix() const
case Collapsing: return "Collapsing"; case Collapsing: return "Collapsing";
case Summing: return "Summing"; case Summing: return "Summing";
case Aggregating: return "Aggregating"; case Aggregating: return "Aggregating";
case Unsorted: return "Unsorted";
default: default:
throw Exception("Unknown mode of operation for MergeTreeData: " + toString(mode), ErrorCodes::LOGICAL_ERROR); throw Exception("Unknown mode of operation for MergeTreeData: " + toString(mode), ErrorCodes::LOGICAL_ERROR);
@ -386,8 +392,14 @@ void MergeTreeData::checkAlter(const AlterCommands & params)
/// Список столбцов, которые нельзя трогать. /// Список столбцов, которые нельзя трогать.
/// sampling_expression можно не учитывать, потому что он обязан содержаться в первичном ключе. /// sampling_expression можно не учитывать, потому что он обязан содержаться в первичном ключе.
Names keys = primary_expr->getRequiredColumns();
Names keys;
if (primary_expr)
keys = primary_expr->getRequiredColumns();
keys.push_back(sign_column); keys.push_back(sign_column);
std::sort(keys.begin(), keys.end()); std::sort(keys.begin(), keys.end());
for (const AlterCommand & command : params) for (const AlterCommand & command : params)
@ -510,7 +522,7 @@ MergeTreeData::AlterDataPartTransactionPtr MergeTreeData::alterDataPart(
BlockInputStreamPtr part_in = new MergeTreeBlockInputStream(full_path + part->name + '/', BlockInputStreamPtr part_in = new MergeTreeBlockInputStream(full_path + part->name + '/',
DEFAULT_MERGE_BLOCK_SIZE, expression->getRequiredColumns(), *this, part, ranges, false, nullptr, "", false); DEFAULT_MERGE_BLOCK_SIZE, expression->getRequiredColumns(), *this, part, ranges, false, nullptr, "", false);
ExpressionBlockInputStream in(part_in, expression); ExpressionBlockInputStream in(part_in, expression);
MergedColumnOnlyOutputStream out(*this, full_path + part->name + '/', true); MergedColumnOnlyOutputStream out(*this, full_path + part->name + '/', true, CompressionMethod::LZ4);
in.readPrefix(); in.readPrefix();
out.writePrefix(); out.writePrefix();

View File

@ -8,6 +8,7 @@
#include <DB/DataStreams/SummingSortedBlockInputStream.h> #include <DB/DataStreams/SummingSortedBlockInputStream.h>
#include <DB/DataStreams/AggregatingSortedBlockInputStream.h> #include <DB/DataStreams/AggregatingSortedBlockInputStream.h>
#include <DB/DataStreams/MaterializingBlockInputStream.h> #include <DB/DataStreams/MaterializingBlockInputStream.h>
#include <DB/DataStreams/ConcatBlockInputStream.h>
namespace DB namespace DB
@ -335,8 +336,12 @@ MergeTreeData::DataPartPtr MergeTreeDataMerger::mergeParts(
__sync_add_and_fetch(&merge_entry->bytes_read_uncompressed, value.bytes); __sync_add_and_fetch(&merge_entry->bytes_read_uncompressed, value.bytes);
}); });
src_streams.push_back(new MaterializingBlockInputStream{ if (data.mode != MergeTreeData::Unsorted)
new ExpressionBlockInputStream(input.release(), data.getPrimaryExpression())}); src_streams.push_back(new MaterializingBlockInputStream{
new ExpressionBlockInputStream(input.release(), data.getPrimaryExpression())});
else
src_streams.push_back(input.release());
sum_rows_approx += parts[i]->size * data.index_granularity; sum_rows_approx += parts[i]->size * data.index_granularity;
} }
@ -363,13 +368,17 @@ MergeTreeData::DataPartPtr MergeTreeDataMerger::mergeParts(
merged_stream = std::make_unique<AggregatingSortedBlockInputStream>(src_streams, data.getSortDescription(), DEFAULT_MERGE_BLOCK_SIZE); merged_stream = std::make_unique<AggregatingSortedBlockInputStream>(src_streams, data.getSortDescription(), DEFAULT_MERGE_BLOCK_SIZE);
break; break;
case MergeTreeData::Unsorted:
merged_stream = std::make_unique<ConcatBlockInputStream>(src_streams);
break;
default: default:
throw Exception("Unknown mode of operation for MergeTreeData: " + toString(data.mode), ErrorCodes::LOGICAL_ERROR); throw Exception("Unknown mode of operation for MergeTreeData: " + toString(data.mode), ErrorCodes::LOGICAL_ERROR);
} }
const String new_part_tmp_path = data.getFullPath() + "tmp_" + merged_name + "/"; const String new_part_tmp_path = data.getFullPath() + "tmp_" + merged_name + "/";
MergedBlockOutputStream to{data, new_part_tmp_path, union_columns}; MergedBlockOutputStream to{data, new_part_tmp_path, union_columns, CompressionMethod::LZ4};
merged_stream->readPrefix(); merged_stream->readPrefix();
to.writePrefix(); to.writePrefix();

View File

@ -83,6 +83,9 @@ BlockInputStreams MergeTreeDataSelectExecutor::read(
PKCondition key_condition(query, context, data.getColumnsList(), data.getSortDescription()); PKCondition key_condition(query, context, data.getColumnsList(), data.getSortDescription());
PKCondition date_condition(query, context, data.getColumnsList(), SortDescription(1, SortColumnDescription(data.date_column_name, 1))); PKCondition date_condition(query, context, data.getColumnsList(), SortDescription(1, SortColumnDescription(data.date_column_name, 1)));
if (settings.force_index_by_date && date_condition.alwaysTrue())
throw Exception("Index by date is not used and setting 'force_index_by_date' is set.", ErrorCodes::INDEX_NOT_USED);
/// Выберем куски, в которых могут быть данные, удовлетворяющие date_condition, и которые подходят под условие на _part. /// Выберем куски, в которых могут быть данные, удовлетворяющие date_condition, и которые подходят под условие на _part.
{ {
auto prev_parts = parts; auto prev_parts = parts;
@ -268,7 +271,11 @@ BlockInputStreams MergeTreeDataSelectExecutor::read(
for (auto & part : parts) for (auto & part : parts)
{ {
RangesInDataPart ranges(part, (*part_index)++); RangesInDataPart ranges(part, (*part_index)++);
ranges.ranges = markRangesFromPkRange(part->index, key_condition, settings);
if (data.mode != MergeTreeData::Unsorted)
ranges.ranges = markRangesFromPkRange(part->index, key_condition, settings);
else
ranges.ranges = MarkRanges{MarkRange{0, part->size}};
if (!ranges.ranges.empty()) if (!ranges.ranges.empty())
{ {
@ -281,7 +288,7 @@ BlockInputStreams MergeTreeDataSelectExecutor::read(
} }
LOG_DEBUG(log, "Selected " << parts.size() << " parts by date, " << parts_with_ranges.size() << " parts by key, " LOG_DEBUG(log, "Selected " << parts.size() << " parts by date, " << parts_with_ranges.size() << " parts by key, "
<< sum_marks << " marks to read from " << sum_ranges << " ranges"); << sum_marks << " marks to read from " << sum_ranges << " ranges");
BlockInputStreams res; BlockInputStreams res;

View File

@ -94,15 +94,18 @@ MergeTreeData::MutableDataPartPtr MergeTreeDataWriter::writeTempPart(BlockWithDa
new_data_part->is_temp = true; new_data_part->is_temp = true;
/// Если для сортировки надо вычислить некоторые столбцы - делаем это. /// Если для сортировки надо вычислить некоторые столбцы - делаем это.
data.getPrimaryExpression()->execute(block); if (data.mode != MergeTreeData::Unsorted)
data.getPrimaryExpression()->execute(block);
SortDescription sort_descr = data.getSortDescription(); SortDescription sort_descr = data.getSortDescription();
/// Сортируем. /// Сортируем.
stableSortBlock(block, sort_descr); if (data.mode != MergeTreeData::Unsorted)
stableSortBlock(block, sort_descr);
NamesAndTypesList columns = data.getColumnsList().filter(block.getColumnsList().getNames()); NamesAndTypesList columns = data.getColumnsList().filter(block.getColumnsList().getNames());
MergedBlockOutputStream out(data, part_tmp_path, columns); MergedBlockOutputStream out(data, part_tmp_path, columns, CompressionMethod::LZ4);
out.getIndex().reserve(part_size * sort_descr.size()); out.getIndex().reserve(part_size * sort_descr.size());
out.writePrefix(); out.writePrefix();

View File

@ -166,6 +166,8 @@ BlockInputStreams StorageDistributed::read(
if (settings.limits.max_network_bandwidth) if (settings.limits.max_network_bandwidth)
throttler.reset(new Throttler(settings.limits.max_network_bandwidth)); throttler.reset(new Throttler(settings.limits.max_network_bandwidth));
Tables external_tables = context.getExternalTables();
/// Цикл по шардам. /// Цикл по шардам.
for (auto & conn_pool : cluster.pools) for (auto & conn_pool : cluster.pools)
res.emplace_back(new RemoteBlockInputStream{ res.emplace_back(new RemoteBlockInputStream{
@ -177,9 +179,6 @@ BlockInputStreams StorageDistributed::read(
{ {
DB::Context new_context = context; DB::Context new_context = context;
new_context.setSettings(new_settings); new_context.setSettings(new_settings);
for (auto & it : external_tables)
if (!new_context.tryGetExternalTable(it.first))
new_context.addExternalTable(it.first, it.second);
for (size_t i = 0; i < cluster.getLocalNodesNum(); ++i) for (size_t i = 0; i < cluster.getLocalNodesNum(); ++i)
{ {
@ -193,7 +192,6 @@ BlockInputStreams StorageDistributed::read(
} }
} }
external_tables.clear();
return res; return res;
} }

View File

@ -350,7 +350,7 @@ StoragePtr StorageFactory::get(
} }
else if (endsWith(name, "MergeTree")) else if (endsWith(name, "MergeTree"))
{ {
/** Движки [Replicated][Summing|Collapsing|Aggregating|]MergeTree (8 комбинаций) /** Движки [Replicated][|Summing|Collapsing|Aggregating|Unsorted]MergeTree (2 * 5 комбинаций)
* В качестве аргумента для движка должно быть указано: * В качестве аргумента для движка должно быть указано:
* - (для Replicated) Путь к таблице в ZooKeeper * - (для Replicated) Путь к таблице в ZooKeeper
* - (для Replicated) Имя реплики в ZooKeeper * - (для Replicated) Имя реплики в ZooKeeper
@ -367,6 +367,7 @@ StoragePtr StorageFactory::get(
* CollapsingMergeTree(date, [sample_key], primary_key, index_granularity, sign) * CollapsingMergeTree(date, [sample_key], primary_key, index_granularity, sign)
* SummingMergeTree(date, [sample_key], primary_key, index_granularity, [columns_to_sum]) * SummingMergeTree(date, [sample_key], primary_key, index_granularity, [columns_to_sum])
* AggregatingMergeTree(date, [sample_key], primary_key, index_granularity) * AggregatingMergeTree(date, [sample_key], primary_key, index_granularity)
* UnsortedMergeTree(date, index_granularity) TODO Добавить описание ниже.
*/ */
const char * verbose_help = R"( const char * verbose_help = R"(
@ -445,6 +446,8 @@ For further info please read the documentation: http://clickhouse.yandex-team.ru
mode = MergeTreeData::Summing; mode = MergeTreeData::Summing;
else if (name_part == "Aggregating") else if (name_part == "Aggregating")
mode = MergeTreeData::Aggregating; mode = MergeTreeData::Aggregating;
else if (name_part == "Unsorted")
mode = MergeTreeData::Unsorted;
else if (!name_part.empty()) else if (!name_part.empty())
throw Exception("Unknown storage " + name + verbose_help, ErrorCodes::UNKNOWN_STORAGE); throw Exception("Unknown storage " + name + verbose_help, ErrorCodes::UNKNOWN_STORAGE);
@ -458,7 +461,26 @@ For further info please read the documentation: http://clickhouse.yandex-team.ru
/// NOTE Слегка запутанно. /// NOTE Слегка запутанно.
size_t num_additional_params = (replicated ? 2 : 0) + (mode == MergeTreeData::Collapsing); size_t num_additional_params = (replicated ? 2 : 0) + (mode == MergeTreeData::Collapsing);
if (mode != MergeTreeData::Summing if (mode == MergeTreeData::Unsorted
&& args.size() != num_additional_params + 2)
{
String params;
if (replicated)
params +=
"\npath in ZooKeeper,"
"\nreplica name,";
params +=
"\nname of column with date,"
"\nindex granularity\n";
throw Exception("Storage " + name + " requires "
+ toString(num_additional_params + 2) + " parameters: " + params + verbose_help,
ErrorCodes::NUMBER_OF_ARGUMENTS_DOESNT_MATCH);
}
if (mode != MergeTreeData::Summing && mode != MergeTreeData::Unsorted
&& args.size() != num_additional_params + 3 && args.size() != num_additional_params + 3
&& args.size() != num_additional_params + 4) && args.size() != num_additional_params + 4)
{ {
@ -579,9 +601,10 @@ For further info please read the documentation: http://clickhouse.yandex-team.ru
else else
throw Exception(String("Date column name must be an unquoted string") + verbose_help, ErrorCodes::BAD_ARGUMENTS); throw Exception(String("Date column name must be an unquoted string") + verbose_help, ErrorCodes::BAD_ARGUMENTS);
primary_expr_list = extractPrimaryKey(args[1]); if (mode != MergeTreeData::Unsorted)
primary_expr_list = extractPrimaryKey(args[1]);
auto ast = typeid_cast<ASTLiteral *>(&*args[2]); auto ast = typeid_cast<ASTLiteral *>(&*args.back());
if (ast && ast->value.getType() == Field::Types::UInt64) if (ast && ast->value.getType() == Field::Types::UInt64)
index_granularity = safeGet<UInt64>(ast->value); index_granularity = safeGet<UInt64>(ast->value);
else else

View File

@ -12,6 +12,7 @@ fi
# Создадим директории для данных второго сервера. # Создадим директории для данных второго сервера.
PATH2=/tmp/clickhouse/ PATH2=/tmp/clickhouse/
[ -d "$PATH2" ] && rm -rf $PATH2
mkdir -p ${PATH2}{data,metadata}/default/ mkdir -p ${PATH2}{data,metadata}/default/
# Создадим второй конфиг с портом 9001. # Создадим второй конфиг с портом 9001.
@ -47,6 +48,7 @@ PID=$!
function finish { function finish {
kill $PID kill $PID
wait
} }
trap finish EXIT trap finish EXIT
@ -104,5 +106,3 @@ $CLIENT1 -n --query="
$CLIENT2 -n --query=" $CLIENT2 -n --query="
DROP TABLE test.half1; DROP TABLE test.half1;
DROP TABLE test.half2;" DROP TABLE test.half2;"
rm -rf $PATH2

View File

@ -0,0 +1,256 @@
0 0 0
1 1 1
10 10 10
100 100 100
101 101 101
102 102 102
103 103 103
104 104 104
105 105 105
106 106 106
107 107 107
108 108 108
109 109 109
11 11 11
110 110 110
111 111 111
112 112 112
113 113 113
114 114 114
115 115 115
116 116 116
117 117 117
118 118 118
119 119 119
12 12 12
120 120 120
121 121 121
122 122 122
123 123 123
124 124 124
125 125 125
126 126 126
127 127 127
128 128 128
129 129 129
13 13 13
130 130 130
131 131 131
132 132 132
133 133 133
134 134 134
135 135 135
136 136 136
137 137 137
138 138 138
139 139 139
14 14 14
140 140 140
141 141 141
142 142 142
143 143 143
144 144 144
145 145 145
146 146 146
147 147 147
148 148 148
149 149 149
15 15 15
150 150 150
151 151 151
152 152 152
153 153 153
154 154 154
155 155 155
156 156 156
157 157 157
158 158 158
159 159 159
16 16 16
160 160 160
161 161 161
162 162 162
163 163 163
164 164 164
165 165 165
166 166 166
167 167 167
168 168 168
169 169 169
17 17 17
170 170 170
171 171 171
172 172 172
173 173 173
174 174 174
175 175 175
176 176 176
177 177 177
178 178 178
179 179 179
18 18 18
180 180 180
181 181 181
182 182 182
183 183 183
184 184 184
185 185 185
186 186 186
187 187 187
188 188 188
189 189 189
19 19 19
190 190 190
191 191 191
192 192 192
193 193 193
194 194 194
195 195 195
196 196 196
197 197 197
198 198 198
199 199 199
2 2 2
20 20 20
200 200 200
201 201 201
202 202 202
203 203 203
204 204 204
205 205 205
206 206 206
207 207 207
208 208 208
209 209 209
21 21 21
210 210 210
211 211 211
212 212 212
213 213 213
214 214 214
215 215 215
216 216 216
217 217 217
218 218 218
219 219 219
22 22 22
220 220 220
221 221 221
222 222 222
223 223 223
224 224 224
225 225 225
226 226 226
227 227 227
228 228 228
229 229 229
23 23 23
230 230 230
231 231 231
232 232 232
233 233 233
234 234 234
235 235 235
236 236 236
237 237 237
238 238 238
239 239 239
24 24 24
240 240 240
241 241 241
242 242 242
243 243 243
244 244 244
245 245 245
246 246 246
247 247 247
248 248 248
249 249 249
25 25 25
250 250 250
251 251 251
252 252 252
253 253 253
254 254 254
255 255 255
26 26 26
27 27 27
28 28 28
29 29 29
3 3 3
30 30 30
31 31 31
32 32 32
33 33 33
34 34 34
35 35 35
36 36 36
37 37 37
38 38 38
39 39 39
4 4 4
40 40 40
41 41 41
42 42 42
43 43 43
44 44 44
45 45 45
46 46 46
47 47 47
48 48 48
49 49 49
5 5 5
50 50 50
51 51 51
52 52 52
53 53 53
54 54 54
55 55 55
56 56 56
57 57 57
58 58 58
59 59 59
6 6 6
60 60 60
61 61 61
62 62 62
63 63 63
64 64 64
65 65 65
66 66 66
67 67 67
68 68 68
69 69 69
7 7 7
70 70 70
71 71 71
72 72 72
73 73 73
74 74 74
75 75 75
76 76 76
77 77 77
78 78 78
79 79 79
8 8 8
80 80 80
81 81 81
82 82 82
83 83 83
84 84 84
85 85 85
86 86 86
87 87 87
88 88 88
89 89 89
9 9 9
90 90 90
91 91 91
92 92 92
93 93 93
94 94 94
95 95 95
96 96 96
97 97 97
98 98 98
99 99 99

View File

@ -0,0 +1,3 @@
SET max_rows_to_read = 1000000;
SET read_overflow_mode = 'break';
SELECT concat(toString(number % 256 AS n), '') AS s, n, max(s) FROM system.numbers_mt GROUP BY s, n, n, n, n, n, n, n, n, n ORDER BY s, n;

View File

@ -0,0 +1 @@
SELECT n FROM (SELECT number AS n FROM system.numbers LIMIT 1000000) ORDER BY n, n, n, n, n, n, n, n, n, n LIMIT 1000000, 1;

View File

@ -0,0 +1,30 @@
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0

View File

@ -0,0 +1,31 @@
SELECT 1 IN (SELECT 1);
SELECT materialize(1) IN (SELECT 1);
SELECT 1 IN (SELECT materialize(1));
SELECT materialize(1) IN (SELECT materialize(1));
SELECT (1, 2) IN (SELECT 1, 2);
SELECT (1, materialize(2)) IN (SELECT 1, 2);
SELECT (1, materialize(2)) IN (SELECT materialize(1), 2);
SELECT (1, materialize(2), 'Hello') IN (SELECT materialize(1), 2, 'Hello');
SELECT (1, materialize(2), materialize('Hello')) IN (SELECT materialize(1), 2, 'Hello');
SELECT (1, materialize(2), materialize('Hello')) IN (SELECT materialize(1), 2, materialize('Hello'));
SELECT (1, materialize(2), 'Hello') IN (SELECT materialize(1), 2, materialize('Hello'));
SELECT 'Hello' IN (SELECT 'Hello');
SELECT materialize('Hello') IN (SELECT 'Hello');
SELECT 'Hello' IN (SELECT materialize('Hello'));
SELECT materialize('Hello') IN (SELECT materialize('Hello'));
SELECT 2 IN (SELECT 1);
SELECT materialize(2) IN (SELECT 1);
SELECT 2 IN (SELECT materialize(1));
SELECT materialize(2) IN (SELECT materialize(1));
SELECT (1, 3) IN (SELECT 1, 2);
SELECT (1, materialize(3)) IN (SELECT 1, 2);
SELECT (1, materialize(3)) IN (SELECT materialize(1), 2);
SELECT (1, materialize(2), 'World') IN (SELECT materialize(1), 2, 'Hello');
SELECT (1, materialize(2), materialize('World')) IN (SELECT materialize(1), 2, 'Hello');
SELECT (1, materialize(2), materialize('World')) IN (SELECT materialize(1), 2, materialize('Hello'));
SELECT (1, materialize(2), 'World') IN (SELECT materialize(1), 2, materialize('Hello'));
SELECT 'World' IN (SELECT 'Hello');
SELECT materialize('World') IN (SELECT 'Hello');
SELECT 'World' IN (SELECT materialize('Hello'));
SELECT materialize('World') IN (SELECT materialize('Hello'));