This commit is contained in:
Evgeniy Gatov 2015-03-14 03:33:19 +03:00
commit 38f22d4cc1
17 changed files with 1310 additions and 55 deletions

View File

@ -71,5 +71,8 @@
#define DBMS_DISTRIBUTED_DIRECTORY_MONITOR_SLEEP_TIME_MS 100 #define DBMS_DISTRIBUTED_DIRECTORY_MONITOR_SLEEP_TIME_MS 100
/// Граница, на которых должны быть выровнены блоки для асинхронных файловых операций.
#define DEFAULT_AIO_FILE_BLOCK_SIZE 512
#define ALWAYS_INLINE __attribute__((__always_inline__)) #define ALWAYS_INLINE __attribute__((__always_inline__))
#define NO_INLINE __attribute__((__noinline__)) #define NO_INLINE __attribute__((__noinline__))

View File

@ -279,6 +279,11 @@ namespace ErrorCodes
INFINITE_LOOP, INFINITE_LOOP,
CANNOT_COMPRESS, CANNOT_COMPRESS,
CANNOT_DECOMPRESS, CANNOT_DECOMPRESS,
AIO_SUBMIT_ERROR,
AIO_COMPLETION_ERROR,
AIO_READ_ERROR,
AIO_WRITE_ERROR,
AIO_UNALIGNED_SIZE_ERROR,
POCO_EXCEPTION = 1000, POCO_EXCEPTION = 1000,
STD_EXCEPTION, STD_EXCEPTION,

View File

@ -117,27 +117,19 @@ public:
/// Используется подмножество ограничений из Limits. /// Используется подмножество ограничений из Limits.
struct LocalLimits struct LocalLimits
{ {
LimitsMode mode; LimitsMode mode = LIMITS_CURRENT;
size_t max_rows_to_read; size_t max_rows_to_read = 0;
size_t max_bytes_to_read; size_t max_bytes_to_read = 0;
OverflowMode read_overflow_mode; OverflowMode read_overflow_mode = OverflowMode::THROW;
Poco::Timespan max_execution_time; Poco::Timespan max_execution_time = 0;
OverflowMode timeout_overflow_mode; OverflowMode timeout_overflow_mode = OverflowMode::THROW;
/// В строчках в секунду. /// В строчках в секунду.
size_t min_execution_speed; size_t min_execution_speed = 0;
/// Проверять, что скорость не слишком низкая, после прошествия указанного времени. /// Проверять, что скорость не слишком низкая, после прошествия указанного времени.
Poco::Timespan timeout_before_checking_execution_speed; Poco::Timespan timeout_before_checking_execution_speed = 0;
LocalLimits()
: mode(LIMITS_CURRENT),
max_rows_to_read(0), max_bytes_to_read(0), read_overflow_mode(OverflowMode::THROW),
max_execution_time(0), timeout_overflow_mode(OverflowMode::THROW),
min_execution_speed(0), timeout_before_checking_execution_speed(0)
{
}
}; };
/** Установить ограничения для проверки на каждый блок. */ /** Установить ограничения для проверки на каждый блок. */

View File

@ -64,6 +64,9 @@ public:
pos = ptr + offset; pos = ptr + offset;
} }
/// получить буфер
inline Buffer & internalBuffer() { return internal_buffer; }
/// получить часть буфера, из которого можно читать / в который можно писать данные /// получить часть буфера, из которого можно читать / в который можно писать данные
inline Buffer & buffer() { return working_buffer; } inline Buffer & buffer() { return working_buffer; }

View File

@ -0,0 +1,69 @@
#pragma once
#include <DB/IO/ReadBuffer.h>
#include <DB/IO/BufferWithOwnMemory.h>
#include <statdaemons/AIO.h>
#include <string>
#include <limits>
#include <unistd.h>
#include <fcntl.h>
namespace DB
{
/** Класс для асинхронной чтения данных.
* Все размеры и смещения должны быть кратны DEFAULT_AIO_FILE_BLOCK_SIZE байтам.
*/
class ReadBufferAIO : public BufferWithOwnMemory<ReadBuffer>
{
public:
ReadBufferAIO(const std::string & filename_, size_t buffer_size_ = DBMS_DEFAULT_BUFFER_SIZE, int flags_ = -1, mode_t mode_ = 0666,
char * existing_memory_ = nullptr);
~ReadBufferAIO() override;
ReadBufferAIO(const ReadBufferAIO &) = delete;
ReadBufferAIO & operator=(const ReadBufferAIO &) = delete;
void setMaxBytes(size_t max_bytes_read_);
off_t seek(off_t off, int whence = SEEK_SET);
off_t getPositionInFile();
std::string getFileName() const noexcept { return filename; }
int getFD() const noexcept { return fd; }
private:
off_t getPositionInFileRelaxed() const noexcept;
bool nextImpl();
/// Ждать окончания текущей асинхронной задачи.
void waitForAIOCompletion();
/// Менять местами основной и дублирующий буферы.
void swapBuffers() noexcept;
private:
/// Буфер для асинхронных операций чтения данных.
BufferWithOwnMemory<ReadBuffer> fill_buffer;
iocb request;
std::vector<iocb *> request_ptrs;
std::vector<io_event> events;
AIOContext aio_context;
const std::string filename;
size_t max_bytes_read = std::numeric_limits<size_t>::max();
size_t total_bytes_read = 0;
off_t pos_in_file = 0;
int fd = -1;
/// Асинхронная операция чтения ещё не завершилась.
bool is_pending_read = false;
/// Было получено исключение.
bool got_exception = false;
/// Конец файла достигнут.
bool is_eof = false;
/// Был отправлен хоть один запрос на асинхронную операцию чтения.
bool is_started = false;
};
}

View File

@ -99,7 +99,7 @@ public:
} }
} }
size_t getPositionInFile() off_t getPositionInFile()
{ {
return pos_in_file - (working_buffer.end() - pos); return pos_in_file - (working_buffer.end() - pos);
} }

View File

@ -0,0 +1,65 @@
#pragma once
#include <DB/IO/WriteBuffer.h>
#include <DB/IO/BufferWithOwnMemory.h>
#include <statdaemons/AIO.h>
#include <string>
#include <unistd.h>
#include <fcntl.h>
namespace DB
{
/** Класс для асинхронной записи данных.
* Все размеры и смещения должны быть кратны DEFAULT_AIO_FILE_BLOCK_SIZE байтам.
*/
class WriteBufferAIO : public BufferWithOwnMemory<WriteBuffer>
{
public:
WriteBufferAIO(const std::string & filename_, size_t buffer_size_ = DBMS_DEFAULT_BUFFER_SIZE, int flags_ = -1, mode_t mode_ = 0666,
char * existing_memory_ = nullptr);
~WriteBufferAIO() override;
WriteBufferAIO(const WriteBufferAIO &) = delete;
WriteBufferAIO & operator=(const WriteBufferAIO &) = delete;
off_t seek(off_t off, int whence = SEEK_SET);
off_t getPositionInFile();
void truncate(off_t length = 0);
void sync();
std::string getFileName() const noexcept { return filename; }
int getFD() const noexcept { return fd; }
private:
/// Если в буфере ещё остались данные - запишем их.
void flush();
///
void nextImpl();
/// Ждать окончания текущей асинхронной задачи.
void waitForAIOCompletion();
/// Менять местами основной и дублирующий буферы.
void swapBuffers() noexcept;
private:
/// Буфер для асинхронных операций записи данных.
BufferWithOwnMemory<WriteBuffer> flush_buffer;
iocb request;
std::vector<iocb *> request_ptrs;
std::vector<io_event> events;
AIOContext aio_context;
const std::string filename;
off_t pos_in_file = 0;
int fd = -1;
/// Асинхронная операция записи ещё не завершилась.
bool is_pending_write = false;
/// Было получено исключение.
bool got_exception = false;
};
}

View File

@ -109,6 +109,9 @@ struct Settings
\ \
/** Минимальная длина выражения expr = x1 OR ... expr = xN для оптимизации */ \ /** Минимальная длина выражения expr = x1 OR ... expr = xN для оптимизации */ \
M(SettingUInt64, optimize_min_equality_disjunction_chain_length, 3) \ M(SettingUInt64, optimize_min_equality_disjunction_chain_length, 3) \
\
/** Минимальное количество байтов для операций ввода/ввывода минуя кэш страниц */ \
M(SettingUInt64, min_bytes_to_use_direct_io, (20U * 1024U * 1024U * 1024U)) \
/// Всевозможные ограничения на выполнение запроса. /// Всевозможные ограничения на выполнение запроса.
Limits limits; Limits limits;

View File

@ -55,6 +55,8 @@ namespace DB
* (см. CollapsingSortedBlockInputStream.h) * (см. CollapsingSortedBlockInputStream.h)
* - Summing - при склейке кусков, при совпадении PK суммировать все числовые столбцы, не входящие в PK. * - Summing - при склейке кусков, при совпадении PK суммировать все числовые столбцы, не входящие в PK.
* - Aggregating - при склейке кусков, при совпадении PK, делается слияние состояний столбцов-агрегатных функций. * - Aggregating - при склейке кусков, при совпадении PK, делается слияние состояний столбцов-агрегатных функций.
* - Unsorted - при склейке кусков, данные не упорядочиваются, а всего лишь конкатенируются;
* - это позволяет читать данные ровно такими пачками, какими они были записаны.
*/ */
/** Этот класс хранит список кусков и параметры структуры данных. /** Этот класс хранит список кусков и параметры структуры данных.
@ -399,18 +401,22 @@ public:
} }
size_t key_size = storage.sort_descr.size(); size_t key_size = storage.sort_descr.size();
index.resize(key_size * size);
String index_path = storage.full_path + name + "/primary.idx"; if (key_size)
ReadBufferFromFile index_file(index_path, {
std::min(static_cast<size_t>(DBMS_DEFAULT_BUFFER_SIZE), Poco::File(index_path).getSize())); index.resize(key_size * size);
for (size_t i = 0; i < size; ++i) String index_path = storage.full_path + name + "/primary.idx";
for (size_t j = 0; j < key_size; ++j) ReadBufferFromFile index_file(index_path,
storage.primary_key_sample.getByPosition(j).type->deserializeBinary(index[i * key_size + j], index_file); std::min(static_cast<size_t>(DBMS_DEFAULT_BUFFER_SIZE), Poco::File(index_path).getSize()));
if (!index_file.eof()) for (size_t i = 0; i < size; ++i)
throw Exception("index file " + index_path + " is unexpectedly long", ErrorCodes::EXPECTED_END_OF_FILE); for (size_t j = 0; j < key_size; ++j)
storage.primary_key_sample.getByPosition(j).type->deserializeBinary(index[i * key_size + j], index_file);
if (!index_file.eof())
throw Exception("index file " + index_path + " is unexpectedly long", ErrorCodes::EXPECTED_END_OF_FILE);
}
size_in_bytes = calcTotalSize(storage.full_path + name + "/"); size_in_bytes = calcTotalSize(storage.full_path + name + "/");
} }
@ -468,7 +474,7 @@ public:
if (!checksums.empty()) if (!checksums.empty())
{ {
if (!checksums.files.count("primary.idx")) if (!storage.sort_descr.empty() && !checksums.files.count("primary.idx"))
throw Exception("No checksum for primary.idx", ErrorCodes::NO_FILE_IN_DATA_PART); throw Exception("No checksum for primary.idx", ErrorCodes::NO_FILE_IN_DATA_PART);
if (require_part_metadata) if (require_part_metadata)
@ -486,12 +492,14 @@ public:
} }
else else
{ {
/// Проверяем, что первичный ключ непуст. if (!storage.sort_descr.empty())
{
/// Проверяем, что первичный ключ непуст.
Poco::File index_file(path + "/primary.idx");
Poco::File index_file(path + "/primary.idx"); if (!index_file.exists() || index_file.getSize() == 0)
throw Exception("Part " + path + " is broken: primary key is empty.", ErrorCodes::BAD_SIZE_OF_FILE_IN_DATA_PART);
if (!index_file.exists() || index_file.getSize() == 0) }
throw Exception("Part " + path + " is broken: primary key is empty.", ErrorCodes::BAD_SIZE_OF_FILE_IN_DATA_PART);
/// Проверяем, что все засечки непусты и имеют одинаковый размер. /// Проверяем, что все засечки непусты и имеют одинаковый размер.
@ -620,6 +628,7 @@ public:
Collapsing, Collapsing,
Summing, Summing,
Aggregating, Aggregating,
Unsorted,
}; };
static void doNothing(const String & name) {} static void doNothing(const String & name) {}
@ -628,7 +637,7 @@ public:
* (корректность имён и путей не проверяется) * (корректность имён и путей не проверяется)
* состоящую из указанных столбцов. * состоящую из указанных столбцов.
* *
* primary_expr_ast - выражение для сортировки; * primary_expr_ast - выражение для сортировки; Пустое для UnsortedMergeTree.
* date_column_name - имя столбца с датой; * date_column_name - имя столбца с датой;
* index_granularity - на сколько строчек пишется одно значение индекса. * index_granularity - на сколько строчек пишется одно значение индекса.
* require_part_metadata - обязательно ли в директории с куском должны быть checksums.txt и columns.txt * require_part_metadata - обязательно ли в директории с куском должны быть checksums.txt и columns.txt

View File

@ -226,7 +226,7 @@ void IProfilingBlockInputStream::progressImpl(const Progress & value)
size_t rows_processed = process_list_elem->progress.rows; size_t rows_processed = process_list_elem->progress.rows;
size_t bytes_processed = process_list_elem->progress.bytes; size_t bytes_processed = process_list_elem->progress.bytes;
size_t total_rows_estimate = std::max(process_list_elem->progress.rows, process_list_elem->progress.total_rows); size_t total_rows_estimate = std::max(rows_processed, process_list_elem->progress.total_rows);
/** Проверяем ограничения на объём данных для чтения, скорость выполнения запроса, квоту на объём данных для чтения. /** Проверяем ограничения на объём данных для чтения, скорость выполнения запроса, квоту на объём данных для чтения.
* NOTE: Может быть, имеет смысл сделать, чтобы они проверялись прямо в ProcessList? * NOTE: Может быть, имеет смысл сделать, чтобы они проверялись прямо в ProcessList?
@ -257,12 +257,26 @@ void IProfilingBlockInputStream::progressImpl(const Progress & value)
{ {
double total_elapsed = info.total_stopwatch.elapsedSeconds(); double total_elapsed = info.total_stopwatch.elapsedSeconds();
if (total_elapsed > limits.timeout_before_checking_execution_speed.totalMicroseconds() / 1000000.0 if (total_elapsed > limits.timeout_before_checking_execution_speed.totalMicroseconds() / 1000000.0)
&& rows_processed / total_elapsed < limits.min_execution_speed)
{ {
throw Exception("Query is executing too slow: " + toString(rows_processed / total_elapsed) if (rows_processed / total_elapsed < limits.min_execution_speed)
+ " rows/sec., minimum: " + toString(limits.min_execution_speed), throw Exception("Query is executing too slow: " + toString(rows_processed / total_elapsed)
ErrorCodes::TOO_SLOW); + " rows/sec., minimum: " + toString(limits.min_execution_speed),
ErrorCodes::TOO_SLOW);
size_t total_rows = process_list_elem->progress.total_rows;
/// Если предсказанное время выполнения больше, чем max_execution_time.
if (limits.max_execution_time != 0 && total_rows)
{
double estimated_execution_time_seconds = total_elapsed * (static_cast<double>(total_rows) / rows_processed);
if (estimated_execution_time_seconds > limits.max_execution_time.totalSeconds())
throw Exception("Estimated query execution time (" + toString(UInt64(estimated_execution_time_seconds)) + " seconds)"
+ " is too long. Maximum: " + toString(limits.max_execution_time.totalSeconds())
+ ". Estimated rows to process: " + toString(total_rows),
ErrorCodes::TOO_SLOW);
}
} }
} }

View File

@ -0,0 +1,222 @@
#include <DB/IO/ReadBufferAIO.h>
#include <DB/Common/ProfileEvents.h>
#include <DB/Core/ErrorCodes.h>
#include <DB/Core/Defines.h>
#include <sys/types.h>
#include <sys/stat.h>
namespace DB
{
ReadBufferAIO::ReadBufferAIO(const std::string & filename_, size_t buffer_size_, int flags_, mode_t mode_,
char * existing_memory_)
: BufferWithOwnMemory(buffer_size_, existing_memory_, DEFAULT_AIO_FILE_BLOCK_SIZE),
fill_buffer(BufferWithOwnMemory(buffer_size_, nullptr, DEFAULT_AIO_FILE_BLOCK_SIZE)),
request_ptrs{ &request }, events(1), filename(filename_)
{
ProfileEvents::increment(ProfileEvents::FileOpen);
int open_flags = (flags_ == -1) ? O_RDONLY : flags_;
open_flags |= O_DIRECT;
fd = ::open(filename.c_str(), open_flags, mode_);
if (fd == -1)
{
got_exception = true;
auto error_code = (errno == ENOENT) ? ErrorCodes::FILE_DOESNT_EXIST : ErrorCodes::CANNOT_OPEN_FILE;
throwFromErrno("Cannot open file " + filename, error_code);
}
::memset(&request, 0, sizeof(request));
}
ReadBufferAIO::~ReadBufferAIO()
{
if (!got_exception)
{
try
{
waitForAIOCompletion();
}
catch (...)
{
tryLogCurrentException(__PRETTY_FUNCTION__);
}
}
if (fd != -1)
::close(fd);
}
void ReadBufferAIO::setMaxBytes(size_t max_bytes_read_)
{
if (is_started)
{
got_exception = true;
throw Exception("Illegal attempt to set the maximum number of bytes to read from file " + filename, ErrorCodes::LOGICAL_ERROR);
}
if ((max_bytes_read_ % DEFAULT_AIO_FILE_BLOCK_SIZE) != 0)
{
got_exception = true;
throw Exception("Invalid maximum number of bytes to read from file " + filename, ErrorCodes::AIO_UNALIGNED_SIZE_ERROR);
}
max_bytes_read = max_bytes_read_;
}
off_t ReadBufferAIO::seek(off_t off, int whence)
{
if ((off % DEFAULT_AIO_FILE_BLOCK_SIZE) != 0)
throw Exception("Invalid offset for ReadBufferAIO::seek", ErrorCodes::AIO_UNALIGNED_SIZE_ERROR);
waitForAIOCompletion();
off_t new_pos;
if (whence == SEEK_SET)
{
if (off < 0)
{
got_exception = true;
throw Exception("SEEK_SET underflow", ErrorCodes::ARGUMENT_OUT_OF_BOUND);
}
new_pos = off;
}
else if (whence == SEEK_CUR)
{
if (off >= 0)
{
if (off > (std::numeric_limits<off_t>::max() - getPositionInFileRelaxed()))
{
got_exception = true;
throw Exception("SEEK_CUR overflow", ErrorCodes::ARGUMENT_OUT_OF_BOUND);
}
}
else if (off < -getPositionInFileRelaxed())
{
got_exception = true;
throw Exception("SEEK_CUR underflow", ErrorCodes::ARGUMENT_OUT_OF_BOUND);
}
new_pos = getPositionInFileRelaxed() + off;
}
else
{
got_exception = true;
throw Exception("ReadBufferAIO::seek expects SEEK_SET or SEEK_CUR as whence", ErrorCodes::ARGUMENT_OUT_OF_BOUND);
}
if (new_pos != getPositionInFileRelaxed())
{
off_t working_buffer_begin_pos = pos_in_file - static_cast<off_t>(working_buffer.size());
if (hasPendingData() && (new_pos >= working_buffer_begin_pos) && (new_pos <= pos_in_file))
{
/// Свдинулись, но остались в пределах буфера.
pos = working_buffer.begin() + (new_pos - working_buffer_begin_pos);
}
else
{
pos = working_buffer.end();
pos_in_file = new_pos;
}
}
return new_pos;
}
off_t ReadBufferAIO::getPositionInFile()
{
return seek(0, SEEK_CUR);
}
off_t ReadBufferAIO::getPositionInFileRelaxed() const noexcept
{
return pos_in_file - (working_buffer.end() - pos);
}
bool ReadBufferAIO::nextImpl()
{
/// Если конец файла уже был достигнут при вызове этой функции,
/// то текущий вызов ошибочен.
if (is_eof)
return false;
waitForAIOCompletion();
/// При первом вызове не надо обменять местами основной и дублирующий буферы.
if (is_started)
swapBuffers();
else
is_started = true;
/// Если конец файла только что достигнут, больше ничего не делаем.
if (is_eof)
return true;
/// Создать запрос.
request.aio_lio_opcode = IOCB_CMD_PREAD;
request.aio_fildes = fd;
request.aio_buf = reinterpret_cast<UInt64>(fill_buffer.internalBuffer().begin());
request.aio_nbytes = std::min(fill_buffer.internalBuffer().size(), max_bytes_read);
request.aio_offset = pos_in_file;
request.aio_reqprio = 0;
/// Отправить запрос.
while (io_submit(aio_context.ctx, request_ptrs.size(), &request_ptrs[0]) < 0)
if (errno != EINTR)
{
got_exception = true;
throw Exception("Cannot submit request for asynchronous IO on file " + filename, ErrorCodes::AIO_SUBMIT_ERROR);
}
is_pending_read = true;
return true;
}
void ReadBufferAIO::waitForAIOCompletion()
{
if (is_pending_read)
{
while (io_getevents(aio_context.ctx, events.size(), events.size(), &events[0], nullptr) < 0)
if (errno != EINTR)
{
got_exception = true;
throw Exception("Failed to wait for asynchronous IO completion on file " + filename, ErrorCodes::AIO_COMPLETION_ERROR);
}
is_pending_read = false;
off_t bytes_read = events[0].res;
if (bytes_read < 0)
{
got_exception = true;
throw Exception("Asynchronous read error on file " + filename, ErrorCodes::AIO_READ_ERROR);
}
if ((bytes_read % DEFAULT_AIO_FILE_BLOCK_SIZE) != 0)
{
got_exception = true;
throw Exception("Received unaligned number of bytes from file " + filename, ErrorCodes::AIO_UNALIGNED_SIZE_ERROR);
}
if (pos_in_file > (std::numeric_limits<off_t>::max() - bytes_read))
{
got_exception = true;
throw Exception("File position overflowed", ErrorCodes::LOGICAL_ERROR);
}
pos_in_file += bytes_read;
total_bytes_read += bytes_read;
if (bytes_read > 0)
fill_buffer.buffer().resize(bytes_read);
if ((static_cast<size_t>(bytes_read) < fill_buffer.internalBuffer().size()) || (total_bytes_read == max_bytes_read))
is_eof = true;
}
}
void ReadBufferAIO::swapBuffers() noexcept
{
internalBuffer().swap(fill_buffer.internalBuffer());
buffer().swap(fill_buffer.buffer());
std::swap(position(), fill_buffer.position());
}
}

View File

@ -0,0 +1,202 @@
#include <DB/IO/WriteBufferAIO.h>
#include <DB/Common/ProfileEvents.h>
#include <DB/Core/ErrorCodes.h>
#include <DB/Core/Defines.h>
#include <limits>
#include <sys/types.h>
#include <sys/stat.h>
namespace DB
{
WriteBufferAIO::WriteBufferAIO(const std::string & filename_, size_t buffer_size_, int flags_, mode_t mode_,
char * existing_memory_)
: BufferWithOwnMemory(buffer_size_, existing_memory_, DEFAULT_AIO_FILE_BLOCK_SIZE),
flush_buffer(BufferWithOwnMemory(buffer_size_, nullptr, DEFAULT_AIO_FILE_BLOCK_SIZE)),
request_ptrs{ &request }, events(1), filename(filename_)
{
ProfileEvents::increment(ProfileEvents::FileOpen);
int open_flags = (flags_ == -1) ? (O_WRONLY | O_TRUNC | O_CREAT) : flags_;
open_flags |= O_DIRECT;
fd = ::open(filename.c_str(), open_flags, mode_);
if (fd == -1)
{
got_exception = true;
auto error_code = (errno == ENOENT) ? ErrorCodes::FILE_DOESNT_EXIST : ErrorCodes::CANNOT_OPEN_FILE;
throwFromErrno("Cannot open file " + filename, error_code);
}
::memset(&request, 0, sizeof(request));
}
WriteBufferAIO::~WriteBufferAIO()
{
if (!got_exception)
{
try
{
flush();
}
catch (...)
{
tryLogCurrentException(__PRETTY_FUNCTION__);
}
}
if (fd != -1)
::close(fd);
}
off_t WriteBufferAIO::seek(off_t off, int whence)
{
if ((off % DEFAULT_AIO_FILE_BLOCK_SIZE) != 0)
throw Exception("Invalid offset for WriteBufferAIO::seek", ErrorCodes::AIO_UNALIGNED_SIZE_ERROR);
flush();
if (whence == SEEK_SET)
{
if (off < 0)
{
got_exception = true;
throw Exception("SEEK_SET underflow", ErrorCodes::ARGUMENT_OUT_OF_BOUND);
}
pos_in_file = off;
}
else if (whence == SEEK_CUR)
{
if (off >= 0)
{
if (off > (std::numeric_limits<off_t>::max() - pos_in_file))
{
got_exception = true;
throw Exception("SEEK_CUR overflow", ErrorCodes::ARGUMENT_OUT_OF_BOUND);
}
}
else if (off < -pos_in_file)
{
got_exception = true;
throw Exception("SEEK_CUR underflow", ErrorCodes::ARGUMENT_OUT_OF_BOUND);
}
pos_in_file += off;
}
else
{
got_exception = true;
throw Exception("WriteBufferAIO::seek expects SEEK_SET or SEEK_CUR as whence", ErrorCodes::ARGUMENT_OUT_OF_BOUND);
}
return pos_in_file;
}
off_t WriteBufferAIO::getPositionInFile()
{
return seek(0, SEEK_CUR);
}
void WriteBufferAIO::truncate(off_t length)
{
if ((length % DEFAULT_AIO_FILE_BLOCK_SIZE) != 0)
throw Exception("Invalid length for WriteBufferAIO::ftruncate", ErrorCodes::AIO_UNALIGNED_SIZE_ERROR);
flush();
int res = ::ftruncate(fd, length);
if (res == -1)
{
got_exception = true;
throwFromErrno("Cannot truncate file " + filename, ErrorCodes::CANNOT_TRUNCATE_FILE);
}
}
void WriteBufferAIO::sync()
{
flush();
/// Попросим ОС сбросить данные на диск.
int res = ::fsync(fd);
if (res == -1)
{
got_exception = true;
throwFromErrno("Cannot fsync " + getFileName(), ErrorCodes::CANNOT_FSYNC);
}
}
void WriteBufferAIO::flush()
{
next();
waitForAIOCompletion();
}
void WriteBufferAIO::nextImpl()
{
if (!offset())
return;
waitForAIOCompletion();
swapBuffers();
/// Создать запрос.
request.aio_lio_opcode = IOCB_CMD_PWRITE;
request.aio_fildes = fd;
request.aio_buf = reinterpret_cast<UInt64>(flush_buffer.buffer().begin());
request.aio_nbytes = flush_buffer.offset();
request.aio_offset = pos_in_file;
request.aio_reqprio = 0;
if ((request.aio_nbytes % DEFAULT_AIO_FILE_BLOCK_SIZE) != 0)
{
got_exception = true;
throw Exception("Illegal attempt to write unaligned data to file " + filename, ErrorCodes::AIO_UNALIGNED_SIZE_ERROR);
}
/// Отправить запрос.
while (io_submit(aio_context.ctx, request_ptrs.size(), &request_ptrs[0]) < 0)
if (errno != EINTR)
{
got_exception = true;
throw Exception("Cannot submit request for asynchronous IO on file " + filename, ErrorCodes::AIO_SUBMIT_ERROR);
}
is_pending_write = true;
}
void WriteBufferAIO::waitForAIOCompletion()
{
if (is_pending_write)
{
while (io_getevents(aio_context.ctx, events.size(), events.size(), &events[0], nullptr) < 0)
if (errno != EINTR)
{
got_exception = true;
throw Exception("Failed to wait for asynchronous IO completion on file " + filename, ErrorCodes::AIO_COMPLETION_ERROR);
}
is_pending_write = false;
off_t bytes_written = events[0].res;
if ((bytes_written < 0) || (static_cast<size_t>(bytes_written) < flush_buffer.offset()))
{
got_exception = true;
throw Exception("Asynchronous write error on file " + filename, ErrorCodes::AIO_WRITE_ERROR);
}
if (pos_in_file > (std::numeric_limits<off_t>::max() - bytes_written))
{
got_exception = true;
throw Exception("File position overflowed", ErrorCodes::LOGICAL_ERROR);
}
pos_in_file += bytes_written;
}
}
void WriteBufferAIO::swapBuffers() noexcept
{
buffer().swap(flush_buffer.buffer());
std::swap(position(), flush_buffer.position());
}
}

View File

@ -0,0 +1,337 @@
#include <DB/IO/ReadBufferAIO.h>
#include <DB/Core/Defines.h>
#include <boost/filesystem.hpp>
#include <vector>
#include <iostream>
#include <fstream>
#include <functional>
#include <cstdlib>
#include <unistd.h>
namespace
{
void run();
void prepare(std::string & directory, std::string & filename, std::string & buf);
void die(const std::string & msg);
void run_test(unsigned int num, const std::function<bool()> func);
bool test1(const std::string & filename);
bool test2(const std::string & filename, const std::string & buf);
bool test3(const std::string & filename, const std::string & buf);
bool test4(const std::string & filename, const std::string & buf);
bool test5(const std::string & filename);
bool test6(const std::string & filename, const std::string & buf);
bool test7(const std::string & filename, const std::string & buf);
bool test8(const std::string & filename);
bool test9(const std::string & filename, const std::string & buf);
bool test10(const std::string & filename, const std::string & buf);
bool test11(const std::string & filename);
bool test12(const std::string & filename, const std::string & buf);
void run()
{
namespace fs = boost::filesystem;
std::string directory;
std::string filename;
std::string buf;
prepare(directory, filename, buf);
const std::vector<std::function<bool()> > tests =
{
std::bind(test1, std::ref(filename)),
std::bind(test2, std::ref(filename), std::ref(buf)),
std::bind(test3, std::ref(filename), std::ref(buf)),
std::bind(test4, std::ref(filename), std::ref(buf)),
std::bind(test5, std::ref(filename)),
std::bind(test6, std::ref(filename), std::ref(buf)),
std::bind(test7, std::ref(filename), std::ref(buf)),
std::bind(test8, std::ref(filename)),
std::bind(test9, std::ref(filename), std::ref(buf)),
std::bind(test10, std::ref(filename), std::ref(buf)),
std::bind(test11, std::ref(filename)),
std::bind(test12, std::ref(filename), std::ref(buf))
};
unsigned int num = 0;
for (const auto & test : tests)
{
++num;
run_test(num, test);
}
fs::remove_all(directory);
}
void prepare(std::string & directory, std::string & filename, std::string & buf)
{
static const std::string symbols = "ABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789";
char pattern[] = "/tmp/fileXXXXXX";
char * dir = ::mkdtemp(pattern);
if (dir == nullptr)
die("Could not create directory");
directory = std::string(dir);
filename = directory + "/foo";
size_t n = 10 * DEFAULT_AIO_FILE_BLOCK_SIZE;
buf.reserve(n);
for (size_t i = 0; i < n; ++i)
buf += symbols[i % symbols.length()];
std::ofstream out(filename.c_str());
if (!out.is_open())
die("Could not open file");
out << buf;
}
void die(const std::string & msg)
{
std::cout << msg << "\n";
::exit(EXIT_FAILURE);
}
void run_test(unsigned int num, const std::function<bool()> func)
{
bool ok;
try
{
ok = func();
}
catch (const DB::Exception & ex)
{
ok = false;
std::cout << "Caught exception " << ex.displayText() << "\n";
}
catch (const std::exception & ex)
{
ok = false;
std::cout << "Caught exception " << ex.what() << "\n";
}
if (ok)
std::cout << "Test " << num << " passed\n";
else
std::cout << "Test " << num << " failed\n";
}
bool test1(const std::string & filename)
{
DB::ReadBufferAIO in(filename, 3 * DEFAULT_AIO_FILE_BLOCK_SIZE);
if (in.getFileName() != filename)
return false;
if (in.getFD() == -1)
return false;
return true;
}
bool test2(const std::string & filename, const std::string & buf)
{
std::string newbuf;
newbuf.resize(buf.length());
DB::ReadBufferAIO in(filename, 3 * DEFAULT_AIO_FILE_BLOCK_SIZE);
size_t count = in.read(&newbuf[0], newbuf.length());
if (count != newbuf.length())
return false;
return (newbuf == buf);
}
bool test3(const std::string & filename, const std::string & buf)
{
std::string newbuf;
newbuf.resize(buf.length());
size_t requested = 9 * DEFAULT_AIO_FILE_BLOCK_SIZE;
DB::ReadBufferAIO in(filename, 3 * DEFAULT_AIO_FILE_BLOCK_SIZE);
in.setMaxBytes(requested);
size_t count = in.read(&newbuf[0], newbuf.length());
newbuf.resize(count);
return (newbuf == buf.substr(0, requested));
}
bool test4(const std::string & filename, const std::string & buf)
{
std::string newbuf;
newbuf.resize(buf.length());
DB::ReadBufferAIO in(filename, 3 * DEFAULT_AIO_FILE_BLOCK_SIZE);
in.setMaxBytes(0);
size_t n_read = in.read(&newbuf[0], newbuf.length());
return n_read == 0;
}
bool test5(const std::string & filename)
{
bool ok = false;
try
{
DB::ReadBufferAIO in(filename, 3 * DEFAULT_AIO_FILE_BLOCK_SIZE);
in.setMaxBytes(DEFAULT_AIO_FILE_BLOCK_SIZE >> 1);
}
catch (const DB::Exception &)
{
ok = true;
}
return ok;
}
bool test6(const std::string & filename, const std::string & buf)
{
std::string newbuf;
newbuf.resize(buf.length());
DB::ReadBufferAIO in(filename, 3 * DEFAULT_AIO_FILE_BLOCK_SIZE);
if (in.getPositionInFile() != 0)
return false;
size_t count = in.read(&newbuf[0], newbuf.length());
if (count != newbuf.length())
return false;
if (static_cast<size_t>(in.getPositionInFile()) != buf.length())
return false;
return true;
}
bool test7(const std::string & filename, const std::string & buf)
{
std::string newbuf;
newbuf.resize(buf.length() - DEFAULT_AIO_FILE_BLOCK_SIZE);
DB::ReadBufferAIO in(filename, 3 * DEFAULT_AIO_FILE_BLOCK_SIZE);
(void) in.seek(DEFAULT_AIO_FILE_BLOCK_SIZE, SEEK_SET);
size_t count = in.read(&newbuf[0], newbuf.length());
if (count != (9 * DEFAULT_AIO_FILE_BLOCK_SIZE))
return false;
return (newbuf == buf.substr(DEFAULT_AIO_FILE_BLOCK_SIZE));
}
bool test8(const std::string & filename)
{
bool ok = false;
try
{
DB::ReadBufferAIO in(filename, 3 * DEFAULT_AIO_FILE_BLOCK_SIZE);
(void) in.seek(DEFAULT_AIO_FILE_BLOCK_SIZE + 1, SEEK_CUR);
}
catch (const DB::Exception &)
{
ok = true;
}
return ok;
}
bool test9(const std::string & filename, const std::string & buf)
{
bool ok = false;
try
{
std::string newbuf;
newbuf.resize(buf.length());
DB::ReadBufferAIO in(filename, 3 * DEFAULT_AIO_FILE_BLOCK_SIZE);
size_t count = in.read(&newbuf[0], newbuf.length());
if (count != newbuf.length())
return false;
in.setMaxBytes(9 * DEFAULT_AIO_FILE_BLOCK_SIZE);
}
catch (const DB::Exception &)
{
ok = true;
}
return ok;
}
bool test10(const std::string & filename, const std::string & buf)
{
std::string newbuf;
newbuf.resize(4 * DEFAULT_AIO_FILE_BLOCK_SIZE);
DB::ReadBufferAIO in(filename, 3 * DEFAULT_AIO_FILE_BLOCK_SIZE);
size_t count1 = in.read(&newbuf[0], newbuf.length());
if (count1 != newbuf.length())
return false;
if (newbuf != buf.substr(0, 4 * DEFAULT_AIO_FILE_BLOCK_SIZE))
return false;
(void) in.seek(2 * DEFAULT_AIO_FILE_BLOCK_SIZE, SEEK_CUR);
size_t count2 = in.read(&newbuf[0], newbuf.length());
if (count2 != newbuf.length())
return false;
if (newbuf != buf.substr(6 * DEFAULT_AIO_FILE_BLOCK_SIZE))
return false;
return true;
}
bool test11(const std::string & filename)
{
bool ok = false;
try
{
DB::ReadBufferAIO in(filename, 3 * DEFAULT_AIO_FILE_BLOCK_SIZE);
(void) in.seek(-DEFAULT_AIO_FILE_BLOCK_SIZE, SEEK_SET);
}
catch (const DB::Exception &)
{
ok = true;
}
return ok;
}
bool test12(const std::string & filename, const std::string & buf)
{
bool ok = false;
try
{
std::string newbuf;
newbuf.resize(4 * DEFAULT_AIO_FILE_BLOCK_SIZE);
DB::ReadBufferAIO in(filename, 3 * DEFAULT_AIO_FILE_BLOCK_SIZE);
size_t count = in.read(&newbuf[0], newbuf.length());
if (count != newbuf.length())
return false;
(void) in.seek(-(10 * DEFAULT_AIO_FILE_BLOCK_SIZE), SEEK_CUR);
}
catch (const DB::Exception &)
{
ok = true;
}
return ok;
}
}
int main()
{
run();
return 0;
}

View File

@ -0,0 +1,291 @@
#include <DB/IO/WriteBufferAIO.h>
#include <DB/Core/Defines.h>
#include <boost/filesystem.hpp>
#include <iostream>
#include <fstream>
#include <streambuf>
#include <cstdlib>
namespace
{
void run();
void die(const std::string & msg);
void run_test(unsigned int num, const std::function<bool()> func);
bool test1();
bool test2();
bool test3();
bool test4();
void run()
{
const std::vector<std::function<bool()> > tests =
{
test1,
test2,
test3,
test4
};
unsigned int num = 0;
for (const auto & test : tests)
{
++num;
run_test(num, test);
}
}
void die(const std::string & msg)
{
std::cout << msg;
::exit(EXIT_FAILURE);
}
void run_test(unsigned int num, const std::function<bool()> func)
{
bool ok;
try
{
ok = func();
}
catch (const DB::Exception & ex)
{
ok = false;
std::cout << "Caught exception " << ex.displayText() << "\n";
}
catch (const std::exception & ex)
{
ok = false;
std::cout << "Caught exception " << ex.what() << "\n";
}
if (ok)
std::cout << "Test " << num << " passed\n";
else
std::cout << "Test " << num << " failed\n";
}
bool test1()
{
namespace fs = boost::filesystem;
static const std::string symbols = "ABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789";
char pattern[] = "/tmp/fileXXXXXX";
char * dir = ::mkdtemp(pattern);
if (dir == nullptr)
die("Could not create directory");
const std::string directory = std::string(dir);
const std::string filename = directory + "/foo";
size_t n = 10 * DEFAULT_AIO_FILE_BLOCK_SIZE;
std::string buf;
buf.reserve(n);
for (size_t i = 0; i < n; ++i)
buf += symbols[i % symbols.length()];
{
DB::WriteBufferAIO out(filename, 3 * DEFAULT_AIO_FILE_BLOCK_SIZE);
if (out.getFileName() != filename)
return false;
if (out.getFD() == -1)
return false;
out.write(&buf[0], buf.length());
}
std::ifstream in(filename.c_str());
if (!in.is_open())
die("Could not open file");
std::string received{ std::istreambuf_iterator<char>(in), std::istreambuf_iterator<char>() };
in.close();
fs::remove_all(directory);
return (received == buf);
}
bool test2()
{
namespace fs = boost::filesystem;
static const std::string symbols = "ABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789";
char pattern[] = "/tmp/fileXXXXXX";
char * dir = ::mkdtemp(pattern);
if (dir == nullptr)
die("Could not create directory");
const std::string directory = std::string(dir);
const std::string filename = directory + "/foo";
size_t n = 10 * DEFAULT_AIO_FILE_BLOCK_SIZE;
std::string buf;
buf.reserve(n);
for (size_t i = 0; i < n; ++i)
buf += symbols[i % symbols.length()];
{
DB::WriteBufferAIO out(filename, 3 * DEFAULT_AIO_FILE_BLOCK_SIZE);
if (out.getFileName() != filename)
return false;
if (out.getFD() == -1)
return false;
out.write(&buf[0], buf.length() / 2);
out.seek(DEFAULT_AIO_FILE_BLOCK_SIZE, SEEK_CUR);
out.write(&buf[buf.length() / 2], buf.length() / 2);
}
std::ifstream in(filename.c_str());
if (!in.is_open())
die("Could not open file");
std::string received{ std::istreambuf_iterator<char>(in), std::istreambuf_iterator<char>() };
in.close();
fs::remove_all(directory);
if (received.substr(0, buf.length() / 2) != buf.substr(0, buf.length() / 2))
return false;
if (received.substr(buf.length() / 2, DEFAULT_AIO_FILE_BLOCK_SIZE) != std::string(DEFAULT_AIO_FILE_BLOCK_SIZE, '\0'))
return false;
if (received.substr(buf.length() / 2 + DEFAULT_AIO_FILE_BLOCK_SIZE) != buf.substr(buf.length() / 2))
return false;
return true;
}
bool test3()
{
namespace fs = boost::filesystem;
static const std::string symbols = "ABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789";
char pattern[] = "/tmp/fileXXXXXX";
char * dir = ::mkdtemp(pattern);
if (dir == nullptr)
die("Could not create directory");
const std::string directory = std::string(dir);
const std::string filename = directory + "/foo";
size_t n = 10 * DEFAULT_AIO_FILE_BLOCK_SIZE;
std::string buf;
buf.reserve(n);
for (size_t i = 0; i < n; ++i)
buf += symbols[i % symbols.length()];
{
DB::WriteBufferAIO out(filename, 3 * DEFAULT_AIO_FILE_BLOCK_SIZE);
if (out.getFileName() != filename)
return false;
if (out.getFD() == -1)
return false;
out.write(&buf[0], buf.length());
off_t pos1 = out.getPositionInFile();
out.truncate(buf.length() / 2);
off_t pos2 = out.getPositionInFile();
if (pos1 != pos2)
return false;
}
std::ifstream in(filename.c_str());
if (!in.is_open())
die("Could not open file");
std::string received{ std::istreambuf_iterator<char>(in), std::istreambuf_iterator<char>() };
in.close();
fs::remove_all(directory);
return (received == buf.substr(0, buf.length() / 2));
}
bool test4()
{
namespace fs = boost::filesystem;
static const std::string symbols = "ABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789";
char pattern[] = "/tmp/fileXXXXXX";
char * dir = ::mkdtemp(pattern);
if (dir == nullptr)
die("Could not create directory");
const std::string directory = std::string(dir);
const std::string filename = directory + "/foo";
size_t n = 10 * DEFAULT_AIO_FILE_BLOCK_SIZE;
std::string buf;
buf.reserve(n);
for (size_t i = 0; i < n; ++i)
buf += symbols[i % symbols.length()];
{
DB::WriteBufferAIO out(filename, 3 * DEFAULT_AIO_FILE_BLOCK_SIZE);
if (out.getFileName() != filename)
return false;
if (out.getFD() == -1)
return false;
out.write(&buf[0], buf.length());
off_t pos1 = out.getPositionInFile();
out.truncate(3 * buf.length() / 2);
off_t pos2 = out.getPositionInFile();
if (pos1 != pos2)
return false;
}
std::ifstream in(filename.c_str());
if (!in.is_open())
die("Could not open file");
std::string received{ std::istreambuf_iterator<char>(in), std::istreambuf_iterator<char>() };
in.close();
fs::remove_all(directory);
if (received.substr(0, buf.length()) != buf)
return false;
if (received.substr(buf.length()) != std::string(buf.length() / 2, '\0'))
return false;
return true;
}
}
int main()
{
run();
return 0;
}

View File

@ -44,7 +44,7 @@ MergeTreeData::MergeTreeData(
date_column_name(date_column_name_), sampling_expression(sampling_expression_), date_column_name(date_column_name_), sampling_expression(sampling_expression_),
index_granularity(index_granularity_), index_granularity(index_granularity_),
mode(mode_), sign_column(sign_column_), columns_to_sum(columns_to_sum_), mode(mode_), sign_column(sign_column_), columns_to_sum(columns_to_sum_),
settings(settings_), primary_expr_ast(primary_expr_ast_->clone()), settings(settings_), primary_expr_ast(primary_expr_ast_ ? primary_expr_ast_->clone() : nullptr),
require_part_metadata(require_part_metadata_), require_part_metadata(require_part_metadata_),
full_path(full_path_), columns(columns_), full_path(full_path_), columns(columns_),
broken_part_callback(broken_part_callback_), broken_part_callback(broken_part_callback_),
@ -89,18 +89,23 @@ MergeTreeData::MergeTreeData(
Poco::File(full_path).createDirectories(); Poco::File(full_path).createDirectories();
Poco::File(full_path + "detached").createDirectory(); Poco::File(full_path + "detached").createDirectory();
/// инициализируем описание сортировки if (primary_expr_ast)
sort_descr.reserve(primary_expr_ast->children.size());
for (const ASTPtr & ast : primary_expr_ast->children)
{ {
String name = ast->getColumnName(); /// инициализируем описание сортировки
sort_descr.push_back(SortColumnDescription(name, 1)); sort_descr.reserve(primary_expr_ast->children.size());
for (const ASTPtr & ast : primary_expr_ast->children)
{
String name = ast->getColumnName();
sort_descr.push_back(SortColumnDescription(name, 1));
}
primary_expr = ExpressionAnalyzer(primary_expr_ast, context, getColumnsList()).getActions(false);
ExpressionActionsPtr projected_expr = ExpressionAnalyzer(primary_expr_ast, context, getColumnsList()).getActions(true);
primary_key_sample = projected_expr->getSampleBlock();
} }
else if (mode != Unsorted)
primary_expr = ExpressionAnalyzer(primary_expr_ast, context, getColumnsList()).getActions(false); throw Exception("Primary key could be empty only for UnsortedMergeTree", ErrorCodes::BAD_ARGUMENTS);
ExpressionActionsPtr projected_expr = ExpressionAnalyzer(primary_expr_ast, context, getColumnsList()).getActions(true);
primary_key_sample = projected_expr->getSampleBlock();
} }
UInt64 MergeTreeData::getMaxDataPartIndex() UInt64 MergeTreeData::getMaxDataPartIndex()
@ -120,6 +125,7 @@ std::string MergeTreeData::getModePrefix() const
case Collapsing: return "Collapsing"; case Collapsing: return "Collapsing";
case Summing: return "Summing"; case Summing: return "Summing";
case Aggregating: return "Aggregating"; case Aggregating: return "Aggregating";
case Unsorted: return "Unsorted";
default: default:
throw Exception("Unknown mode of operation for MergeTreeData: " + toString(mode), ErrorCodes::LOGICAL_ERROR); throw Exception("Unknown mode of operation for MergeTreeData: " + toString(mode), ErrorCodes::LOGICAL_ERROR);
@ -386,8 +392,14 @@ void MergeTreeData::checkAlter(const AlterCommands & params)
/// Список столбцов, которые нельзя трогать. /// Список столбцов, которые нельзя трогать.
/// sampling_expression можно не учитывать, потому что он обязан содержаться в первичном ключе. /// sampling_expression можно не учитывать, потому что он обязан содержаться в первичном ключе.
Names keys = primary_expr->getRequiredColumns();
Names keys;
if (primary_expr)
keys = primary_expr->getRequiredColumns();
keys.push_back(sign_column); keys.push_back(sign_column);
std::sort(keys.begin(), keys.end()); std::sort(keys.begin(), keys.end());
for (const AlterCommand & command : params) for (const AlterCommand & command : params)

View File

@ -8,6 +8,7 @@
#include <DB/DataStreams/SummingSortedBlockInputStream.h> #include <DB/DataStreams/SummingSortedBlockInputStream.h>
#include <DB/DataStreams/AggregatingSortedBlockInputStream.h> #include <DB/DataStreams/AggregatingSortedBlockInputStream.h>
#include <DB/DataStreams/MaterializingBlockInputStream.h> #include <DB/DataStreams/MaterializingBlockInputStream.h>
#include <DB/DataStreams/ConcatBlockInputStream.h>
namespace DB namespace DB
@ -363,6 +364,10 @@ MergeTreeData::DataPartPtr MergeTreeDataMerger::mergeParts(
merged_stream = std::make_unique<AggregatingSortedBlockInputStream>(src_streams, data.getSortDescription(), DEFAULT_MERGE_BLOCK_SIZE); merged_stream = std::make_unique<AggregatingSortedBlockInputStream>(src_streams, data.getSortDescription(), DEFAULT_MERGE_BLOCK_SIZE);
break; break;
case MergeTreeData::Unsorted:
merged_stream = std::make_unique<ConcatBlockInputStream>(src_streams);
break;
default: default:
throw Exception("Unknown mode of operation for MergeTreeData: " + toString(data.mode), ErrorCodes::LOGICAL_ERROR); throw Exception("Unknown mode of operation for MergeTreeData: " + toString(data.mode), ErrorCodes::LOGICAL_ERROR);
} }

View File

@ -350,7 +350,7 @@ StoragePtr StorageFactory::get(
} }
else if (endsWith(name, "MergeTree")) else if (endsWith(name, "MergeTree"))
{ {
/** Движки [Replicated][Summing|Collapsing|Aggregating|]MergeTree (8 комбинаций) /** Движки [Replicated][|Summing|Collapsing|Aggregating|Unsorted]MergeTree (2 * 5 комбинаций)
* В качестве аргумента для движка должно быть указано: * В качестве аргумента для движка должно быть указано:
* - (для Replicated) Путь к таблице в ZooKeeper * - (для Replicated) Путь к таблице в ZooKeeper
* - (для Replicated) Имя реплики в ZooKeeper * - (для Replicated) Имя реплики в ZooKeeper
@ -367,6 +367,7 @@ StoragePtr StorageFactory::get(
* CollapsingMergeTree(date, [sample_key], primary_key, index_granularity, sign) * CollapsingMergeTree(date, [sample_key], primary_key, index_granularity, sign)
* SummingMergeTree(date, [sample_key], primary_key, index_granularity, [columns_to_sum]) * SummingMergeTree(date, [sample_key], primary_key, index_granularity, [columns_to_sum])
* AggregatingMergeTree(date, [sample_key], primary_key, index_granularity) * AggregatingMergeTree(date, [sample_key], primary_key, index_granularity)
* UnsortedMergeTree(date, index_granularity) TODO Добавить описание ниже.
*/ */
const char * verbose_help = R"( const char * verbose_help = R"(
@ -445,6 +446,8 @@ For further info please read the documentation: http://clickhouse.yandex-team.ru
mode = MergeTreeData::Summing; mode = MergeTreeData::Summing;
else if (name_part == "Aggregating") else if (name_part == "Aggregating")
mode = MergeTreeData::Aggregating; mode = MergeTreeData::Aggregating;
else if (name_part == "Unsorted")
mode = MergeTreeData::Unsorted;
else if (!name_part.empty()) else if (!name_part.empty())
throw Exception("Unknown storage " + name + verbose_help, ErrorCodes::UNKNOWN_STORAGE); throw Exception("Unknown storage " + name + verbose_help, ErrorCodes::UNKNOWN_STORAGE);
@ -458,7 +461,26 @@ For further info please read the documentation: http://clickhouse.yandex-team.ru
/// NOTE Слегка запутанно. /// NOTE Слегка запутанно.
size_t num_additional_params = (replicated ? 2 : 0) + (mode == MergeTreeData::Collapsing); size_t num_additional_params = (replicated ? 2 : 0) + (mode == MergeTreeData::Collapsing);
if (mode != MergeTreeData::Summing if (mode == MergeTreeData::Unsorted
&& args.size() != num_additional_params + 2)
{
String params;
if (replicated)
params +=
"\npath in ZooKeeper,"
"\nreplica name,";
params +=
"\nname of column with date,"
"\nindex granularity\n";
throw Exception("Storage " + name + " requires "
+ toString(num_additional_params + 2) + " parameters: " + params + verbose_help,
ErrorCodes::NUMBER_OF_ARGUMENTS_DOESNT_MATCH);
}
if (mode != MergeTreeData::Summing && mode != MergeTreeData::Unsorted
&& args.size() != num_additional_params + 3 && args.size() != num_additional_params + 3
&& args.size() != num_additional_params + 4) && args.size() != num_additional_params + 4)
{ {
@ -579,9 +601,10 @@ For further info please read the documentation: http://clickhouse.yandex-team.ru
else else
throw Exception(String("Date column name must be an unquoted string") + verbose_help, ErrorCodes::BAD_ARGUMENTS); throw Exception(String("Date column name must be an unquoted string") + verbose_help, ErrorCodes::BAD_ARGUMENTS);
primary_expr_list = extractPrimaryKey(args[1]); if (mode != MergeTreeData::Unsorted)
primary_expr_list = extractPrimaryKey(args[1]);
auto ast = typeid_cast<ASTLiteral *>(&*args[2]); auto ast = typeid_cast<ASTLiteral *>(&*args.back());
if (ast && ast->value.getType() == Field::Types::UInt64) if (ast && ast->value.getType() == Field::Types::UInt64)
index_granularity = safeGet<UInt64>(ast->value); index_granularity = safeGet<UInt64>(ast->value);
else else