This commit is contained in:
Alexey Arno 2015-04-14 18:00:57 +03:00
parent 00be055ebd
commit 7415b65a55
25 changed files with 92 additions and 69 deletions

View File

@ -20,6 +20,7 @@ private:
const std::string path;
UncompressedCache * cache;
size_t buf_size;
size_t estimated_size;
size_t aio_threshold;
/// SharedPtr - для ленивой инициализации (только в случае кэш-промаха).
@ -33,7 +34,7 @@ private:
{
if (!file_in)
{
file_in = createReadBufferFromFileBase(path, aio_threshold, buf_size);
file_in = createReadBufferFromFileBase(path, estimated_size, aio_threshold, buf_size);
compressed_in = &*file_in;
}
}
@ -81,9 +82,10 @@ private:
}
public:
CachedCompressedReadBuffer(const std::string & path_, UncompressedCache * cache_, size_t aio_threshold_,
size_t buf_size_ = DBMS_DEFAULT_BUFFER_SIZE)
: ReadBuffer(nullptr, 0), path(path_), cache(cache_), buf_size(buf_size_), aio_threshold(aio_threshold_), file_pos(0)
CachedCompressedReadBuffer(const std::string & path_, UncompressedCache * cache_, size_t estimated_size_,
size_t aio_threshold_, size_t buf_size_ = DBMS_DEFAULT_BUFFER_SIZE)
: ReadBuffer(nullptr, 0), path(path_), cache(cache_), buf_size(buf_size_),
estimated_size(estimated_size_), aio_threshold(aio_threshold_), file_pos(0)
{
}

View File

@ -42,9 +42,9 @@ private:
}
public:
CompressedReadBufferFromFile(const std::string & path, size_t aio_threshold, size_t buf_size = DBMS_DEFAULT_BUFFER_SIZE)
CompressedReadBufferFromFile(const std::string & path, size_t estimated_size, size_t aio_threshold, size_t buf_size = DBMS_DEFAULT_BUFFER_SIZE)
: BufferWithOwnMemory<ReadBuffer>(0),
p_file_in(createReadBufferFromFileBase(path, aio_threshold, buf_size)),
p_file_in(createReadBufferFromFileBase(path, estimated_size, aio_threshold, buf_size)),
file_in(*p_file_in)
{
compressed_in = &file_in;

View File

@ -10,7 +10,6 @@
#include <limits>
#include <unistd.h>
#include <fcntl.h>
#include <sys/uio.h>
namespace DB
{
@ -29,8 +28,8 @@ public:
void setMaxBytes(size_t max_bytes_read_);
off_t getPositionInFile() override { return first_unread_pos_in_file - (working_buffer.end() - pos); }
std::string getFileName() const noexcept override { return filename; }
int getFD() const noexcept override { return fd; }
std::string getFileName() const override { return filename; }
int getFD() const override { return fd; }
private:
///

View File

@ -16,8 +16,8 @@ public:
virtual ~ReadBufferFromFileBase();
off_t seek(off_t off, int whence = SEEK_SET);
virtual off_t getPositionInFile() = 0;
virtual std::string getFileName() const noexcept = 0;
virtual int getFD() const noexcept = 0;
virtual std::string getFileName() const = 0;
virtual int getFD() const = 0;
protected:
virtual off_t doSeek(off_t off, int whence) = 0;

View File

@ -54,7 +54,7 @@ protected:
}
/// Имя или описание файла
virtual std::string getFileName() const noexcept override
virtual std::string getFileName() const override
{
return "(fd = " + toString(fd) + ")";
}
@ -63,7 +63,7 @@ public:
ReadBufferFromFileDescriptor(int fd_, size_t buf_size = DBMS_DEFAULT_BUFFER_SIZE, char * existing_memory = nullptr, size_t alignment = 0)
: ReadBufferFromFileBase(buf_size, existing_memory, alignment), fd(fd_), pos_in_file(0) {}
int getFD() const noexcept override
int getFD() const override
{
return fd;
}

View File

@ -28,8 +28,8 @@ public:
off_t getPositionInFile() override;
void truncate(off_t length = 0) override;
void sync() override;
std::string getFileName() const noexcept override { return filename; }
int getFD() const noexcept override { return fd; }
std::string getFileName() const override { return filename; }
int getFD() const override { return fd; }
private:
///

View File

@ -18,8 +18,8 @@ public:
virtual off_t getPositionInFile() = 0;
virtual void truncate(off_t length) = 0;
virtual void sync() = 0;
virtual std::string getFileName() const noexcept = 0;
virtual int getFD() const noexcept = 0;
virtual std::string getFileName() const = 0;
virtual int getFD() const = 0;
protected:
virtual off_t doSeek(off_t off, int whence) = 0;

View File

@ -41,7 +41,7 @@ protected:
}
/// Имя или описание файла
virtual std::string getFileName() const noexcept override
virtual std::string getFileName() const override
{
return "(fd = " + toString(fd) + ")";
}
@ -70,7 +70,7 @@ public:
}
}
int getFD() const noexcept override
int getFD() const override
{
return fd;
}

View File

@ -2,12 +2,19 @@
#include <DB/IO/ReadBufferFromFileBase.h>
#include <string>
#include <sys/stat.h>
namespace DB
{
/** Создать объект для чтения данных из файла.
* estimated_size - количество байтов, которые надо читать
* aio_threshold - минимальное количество байтов для асинхронных операций чтения
*
* Если aio_threshold = 0 или estimated_size < aio_threshold, операции чтения выполняются синхронно.
* В противном случае операции чтения выполняются асинхронно.
*/
ReadBufferFromFileBase * createReadBufferFromFileBase(const std::string & filename_,
size_t estimated_size,
size_t aio_threshold,
size_t buffer_size_ = DBMS_DEFAULT_BUFFER_SIZE,
int flags_ = -1,

View File

@ -2,11 +2,17 @@
#include <DB/IO/WriteBufferFromFileBase.h>
#include <string>
#include <sys/stat.h>
namespace DB
{
/** Создать объект для записи данных в файл.
* estimated_size - количество байтов, которые надо записать
* aio_threshold - минимальное количество байтов для асинхронных операций записи
*
* Если aio_threshold = 0 или estimated_size < aio_threshold, операции записи выполняются синхронно.
* В противном случае операции записи выполняются асинхронно.
*/
WriteBufferFromFileBase * createWriteBufferFromFileBase(const std::string & filename_,
size_t estimated_size,
size_t aio_threshold,

View File

@ -15,21 +15,21 @@ class InterpreterOptimizeQuery
{
public:
InterpreterOptimizeQuery(ASTPtr query_ptr_, Context & context_)
: query_ptr(query_ptr_), context(context_),
aio_threshold(context_.getSettings().min_bytes_to_use_direct_io) {}
: query_ptr(query_ptr_), context(context_)
{
}
void execute()
{
const ASTOptimizeQuery & ast = typeid_cast<const ASTOptimizeQuery &>(*query_ptr);
StoragePtr table = context.getTable(ast.database, ast.table);
auto table_lock = table->lockStructure(true);
table->optimize(aio_threshold);
table->optimize(context.getSettings());
}
private:
ASTPtr query_ptr;
Context context;
size_t aio_threshold;
};

View File

@ -236,9 +236,9 @@ public:
/** Выполнить какую-либо фоновую работу. Например, объединение кусков в таблице типа MergeTree.
* Возвращает - была ли выполнена какая-либо работа.
*/
bool optimize(size_t aio_threshold = 0)
virtual bool optimize(const Settings & settings)
{
return performOptimize(aio_threshold);
throw Exception("Method optimize is not supported by storage " + getName(), ErrorCodes::NOT_IMPLEMENTED);
}
/** Получить запрос CREATE TABLE, который описывает данную таблицу.
@ -279,12 +279,6 @@ public:
/// проверяет валидность данных
virtual bool checkData() const { throw DB::Exception("Check query is not supported for " + getName() + " storage"); }
protected:
virtual bool performOptimize(size_t aio_threshold)
{
throw Exception("Method optimize is not supported by storage " + getName(), ErrorCodes::NOT_IMPLEMENTED);
}
protected:
using ITableDeclaration::ITableDeclaration;

View File

@ -32,7 +32,7 @@ struct MarkInCompressedFile
typedef std::vector<MarkInCompressedFile> MarksInCompressedFile;
/// Оценка количества байт, занимаемых засечками в кеше.
/// Оценка количества байтов, занимаемых засечками в кеше.
struct MarksWeightFunction
{
size_t operator()(const MarksInCompressedFile & marks) const

View File

@ -256,16 +256,36 @@ private:
(*marks)[right].offset_in_compressed_file - (*marks)[all_mark_ranges[i].begin].offset_in_compressed_file);
}
size_t buffer_size = max_read_buffer_size < max_mark_range ? max_read_buffer_size : max_mark_range;
size_t buffer_size = std::min(max_read_buffer_size, max_mark_range);
size_t estimated_size = 0;
if (aio_threshold > 0)
{
for (const auto & mark_range : all_mark_ranges)
{
size_t offset_begin = (*marks)[mark_range.begin].offset_in_compressed_file;
size_t offset_end;
if (mark_range.end < (*marks).size())
offset_end = (*marks)[mark_range.end].offset_in_compressed_file;
else
offset_end = Poco::File(path_prefix + ".bin").getSize();
if (offset_end > 0)
estimated_size += offset_end - offset_begin;
}
}
if (uncompressed_cache)
{
cached_buffer = new CachedCompressedReadBuffer(path_prefix + ".bin", uncompressed_cache, aio_threshold, buffer_size);
cached_buffer = new CachedCompressedReadBuffer(path_prefix + ".bin", uncompressed_cache,
estimated_size, aio_threshold, buffer_size);
data_buffer = &*cached_buffer;
}
else
{
non_cached_buffer = new CompressedReadBufferFromFile(path_prefix + ".bin", aio_threshold, buffer_size);
non_cached_buffer = new CompressedReadBufferFromFile(path_prefix + ".bin", estimated_size,
aio_threshold, buffer_size);
data_buffer = &*non_cached_buffer;
}
}

View File

@ -70,6 +70,7 @@ public:
/// Сбрасывает все буферы в подчинённую таблицу.
void shutdown() override;
bool optimize(const Settings & settings) override;
void rename(const String & new_path_to_db, const String & new_database_name, const String & new_table_name) override { name = new_table_name; }
@ -123,8 +124,6 @@ private:
void writeBlockToDestination(const Block & block, StoragePtr table);
void flushThread();
bool performOptimize(size_t aio_threshold) override;
};
}

View File

@ -32,6 +32,7 @@ public:
BlockOutputStreamPtr write(ASTPtr query) override;
void drop() override;
bool optimize(const Settings & settings) override;
BlockInputStreams read(
const Names & column_names,
@ -55,8 +56,6 @@ private:
const NamesAndTypesList & alias_columns_,
const ColumnDefaults & column_defaults_,
bool attach_);
bool performOptimize(size_t aio_threshold) override;
};
}

View File

@ -79,6 +79,13 @@ public:
BlockOutputStreamPtr write(ASTPtr query) override;
/** Выполнить очередной шаг объединения кусков.
*/
bool optimize(const Settings & settings) override
{
return merge(settings.min_bytes_to_use_direct_io, true);
}
void dropPartition(const Field & partition, bool detach, const Settings & settings) override;
void attachPartition(const Field & partition, bool unreplicated, bool part, const Settings & settings) override;
void freezePartition(const Field & partition, const Settings & settings) override;
@ -93,14 +100,6 @@ public:
MergeTreeData & getData() { return data; }
private:
/** Выполнить очередной шаг объединения кусков.
*/
bool performOptimize(size_t aio_threshold) override
{
return merge(aio_threshold, true);
}
private:
String path;
String database_name;

View File

@ -83,6 +83,8 @@ public:
BlockOutputStreamPtr write(ASTPtr query) override;
bool optimize(const Settings & settings) override;
void alter(const AlterCommands & params, const String & database_name, const String & table_name, Context & context) override;
void dropPartition(const Field & partition, bool detach, const Settings & settings) override;
@ -386,8 +388,6 @@ private:
*/
void waitForReplicaToProcessLogEntry(const String & replica_name, const LogEntry & entry);
bool performOptimize(size_t aio_threshold) override;
/// Преобразовать число в строку формате суффиксов автоинкрементных нод в ZooKeeper.
static String padIndex(UInt64 index)
{

View File

@ -17,4 +17,4 @@ off_t WriteBufferFromFileBase::seek(off_t off, int whence)
return doSeek(off, whence);
}
}
}

View File

@ -1,17 +1,14 @@
#include <DB/IO/createReadBufferFromFileBase.h>
#include <DB/IO/ReadBufferFromFile.h>
#include <DB/IO/ReadBufferAIO.h>
#include <Poco/File.h>
namespace DB
{
ReadBufferFromFileBase * createReadBufferFromFileBase(const std::string & filename_, size_t aio_threshold,
size_t buffer_size_, int flags_, char * existing_memory_, size_t alignment)
ReadBufferFromFileBase * createReadBufferFromFileBase(const std::string & filename_, size_t estimated_size,
size_t aio_threshold, size_t buffer_size_, int flags_, char * existing_memory_, size_t alignment)
{
size_t file_size = (aio_threshold > 0) ? Poco::File(filename_).getSize() : 0;
if ((aio_threshold == 0) || (file_size < aio_threshold))
if ((aio_threshold == 0) || (estimated_size < aio_threshold))
return new ReadBufferFromFile(filename_, buffer_size_, flags_, existing_memory_, alignment);
else
return new ReadBufferAIO(filename_, buffer_size_, flags_, existing_memory_);

View File

@ -6,7 +6,7 @@ namespace DB
{
WriteBufferFromFileBase * createWriteBufferFromFileBase(const std::string & filename_, size_t estimated_size,
size_t aio_threshold, size_t buffer_size_, int flags_, mode_t mode, char * existing_memory_,
size_t aio_threshold, size_t buffer_size_, int flags_, mode_t mode, char * existing_memory_,
size_t alignment)
{
if ((aio_threshold == 0) || (estimated_size < aio_threshold))

View File

@ -25,7 +25,7 @@ int main(int argc, char ** argv)
{
Stopwatch watch;
CachedCompressedReadBuffer in(path, &cache, std::numeric_limits<size_t>::max());
CachedCompressedReadBuffer in(path, &cache, 0, 0);
WriteBufferFromFile out("/dev/null");
copyData(in, out);
@ -37,7 +37,7 @@ int main(int argc, char ** argv)
{
Stopwatch watch;
CachedCompressedReadBuffer in(path, &cache, std::numeric_limits<size_t>::max());
CachedCompressedReadBuffer in(path, &cache, 0, 0);
WriteBufferFromFile out("/dev/null");
copyData(in, out);

View File

@ -265,11 +265,11 @@ void StorageBuffer::shutdown()
if (flush_thread.joinable())
flush_thread.join();
optimize();
optimize(context.getSettings());
}
bool StorageBuffer::performOptimize(size_t aio_threshold)
bool StorageBuffer::optimize(const Settings & settings)
{
flushAllBuffers(false);
@ -442,7 +442,7 @@ void StorageBuffer::alter(const AlterCommands & params, const String & database_
auto lock = lockStructureForAlter();
/// Чтобы не осталось блоков старой структуры.
optimize();
optimize(context.getSettings());
params.apply(*columns, materialized_columns, alias_columns, column_defaults);
InterpreterAlterQuery::updateMetadata(database_name, table_name,

View File

@ -129,9 +129,9 @@ void StorageMaterializedView::drop()
}
}
bool StorageMaterializedView::performOptimize(size_t aio_threshold)
bool StorageMaterializedView::optimize(const Settings & settings)
{
return data->optimize(aio_threshold);
return data->optimize(settings);
}

View File

@ -2060,7 +2060,7 @@ BlockOutputStreamPtr StorageReplicatedMergeTree::write(ASTPtr query)
}
bool StorageReplicatedMergeTree::performOptimize(size_t aio_threshold)
bool StorageReplicatedMergeTree::optimize(const Settings & settings)
{
/// Померджим какие-нибудь куски из директории unreplicated.
/// TODO: Мерджить реплицируемые куски тоже.
@ -2074,12 +2074,13 @@ bool StorageReplicatedMergeTree::performOptimize(size_t aio_threshold)
MergeTreeData::DataPartsVector parts;
String merged_name;
auto always_can_merge = [](const MergeTreeData::DataPartPtr &a, const MergeTreeData::DataPartPtr &b) { return true; };
auto always_can_merge = [](const MergeTreeData::DataPartPtr & a, const MergeTreeData::DataPartPtr & b) { return true; };
if (!unreplicated_merger->selectPartsToMerge(parts, merged_name, 0, true, true, false, always_can_merge))
return false;
const auto & merge_entry = context.getMergeList().insert(database_name, table_name, merged_name);
unreplicated_merger->mergeParts(parts, merged_name, *merge_entry, aio_threshold);
unreplicated_merger->mergeParts(parts, merged_name, *merge_entry, settings.min_bytes_to_use_direct_io);
return true;
}