From 6395e841f9155e66c4aff1d49020f71108247d50 Mon Sep 17 00:00:00 2001 From: Sergey Fedorov Date: Fri, 28 Mar 2014 18:36:24 +0400 Subject: [PATCH] Merge --- dbms/include/DB/Core/Defines.h | 2 ++ dbms/include/DB/IO/UncompressedCache.h | 16 ++++++++++---- dbms/include/DB/Interpreters/Context.h | 2 +- dbms/include/DB/Interpreters/Settings.h | 4 ++++ .../MergeTree/MergedBlockOutputStream.h | 22 ++++++++++++++----- dbms/include/DB/Storages/StorageLog.h | 10 +++++---- dbms/include/DB/Storages/StorageTinyLog.h | 10 +++++---- dbms/src/Interpreters/Context.cpp | 4 ++-- dbms/src/Storages/StorageChunks.cpp | 2 +- dbms/src/Storages/StorageFactory.cpp | 4 ++-- dbms/src/Storages/StorageLog.cpp | 14 ++++++------ dbms/src/Storages/StorageTinyLog.cpp | 14 ++++++------ 12 files changed, 66 insertions(+), 38 deletions(-) diff --git a/dbms/include/DB/Core/Defines.h b/dbms/include/DB/Core/Defines.h index 025bb85e44c..e492db12a58 100644 --- a/dbms/include/DB/Core/Defines.h +++ b/dbms/include/DB/Core/Defines.h @@ -13,6 +13,8 @@ #define DBMS_DEFAULT_RECEIVE_TIMEOUT_SEC 300 #define DBMS_DEFAULT_POLL_INTERVAL 10 +#define DEFAULT_MIN_COMPRESS_BLOCK_SIZE 65536 +#define DEFAULT_MAX_COMPRESS_BLOCK_SIZE 1048576 /// Какими блоками по-умолчанию читаются и пишутся данные (в числе строк). #define DEFAULT_BLOCK_SIZE 1048576 /// То же самое, но для операций слияния. Меньше DEFAULT_BLOCK_SIZE для экономии оперативки (так как читаются все столбцы). diff --git a/dbms/include/DB/IO/UncompressedCache.h b/dbms/include/DB/IO/UncompressedCache.h index aebeaa9bdf8..4463d2726cb 100644 --- a/dbms/include/DB/IO/UncompressedCache.h +++ b/dbms/include/DB/IO/UncompressedCache.h @@ -17,17 +17,25 @@ struct UncompressedCacheCell size_t compressed_size; }; +struct UncompressedSizeWeightFunction +{ + size_t operator()(const UncompressedCacheCell & x) const + { + return x.data.size(); + } +}; + /** Кэш разжатых блоков для CachedCompressedReadBuffer. thread-safe. */ -class UncompressedCache : public LRUCache +class UncompressedCache : public LRUCache { private: - typedef LRUCache Base; + typedef LRUCache Base; public: - UncompressedCache(size_t max_size_in_cells) - : Base(max_size_in_cells) {} + UncompressedCache(size_t max_size_in_bytes) + : Base(max_size_in_bytes) {} /// Посчитать ключ от пути к файлу и смещения. static UInt128 hash(const String & path_to_file, size_t offset) diff --git a/dbms/include/DB/Interpreters/Context.h b/dbms/include/DB/Interpreters/Context.h index 0bf42ab6f3b..a57839ad699 100644 --- a/dbms/include/DB/Interpreters/Context.h +++ b/dbms/include/DB/Interpreters/Context.h @@ -288,7 +288,7 @@ public: const ProcessList & getProcessList() const { return shared->process_list; } /// Создать кэш разжатых блоков указанного размера. Это можно сделать только один раз. - void setUncompressedCache(size_t cache_size_in_cells); + void setUncompressedCache(size_t max_size_in_bytes); UncompressedCachePtr getUncompressedCache() const; /// Создать кэш засечек указанного размера. Это можно сделать только один раз. diff --git a/dbms/include/DB/Interpreters/Settings.h b/dbms/include/DB/Interpreters/Settings.h index 63230726cd2..e7a57de3ec1 100644 --- a/dbms/include/DB/Interpreters/Settings.h +++ b/dbms/include/DB/Interpreters/Settings.h @@ -24,6 +24,10 @@ struct Settings */ #define APPLY_FOR_SETTINGS(M) \ + /** Минимальный размер блока, готового для сжатия */ \ + M(SettingUInt64, min_compress_block_size, DEFAULT_MIN_COMPRESS_BLOCK_SIZE) \ + /** Максимальный размер блока, пригодного для сжатия */ \ + M(SettingUInt64, max_compress_block_size, DEFAULT_MAX_COMPRESS_BLOCK_SIZE) \ /** Максимальный размер блока для чтения */ \ M(SettingUInt64, max_block_size, DEFAULT_BLOCK_SIZE) \ /** Максимальное количество потоков выполнения запроса */ \ diff --git a/dbms/include/DB/Storages/MergeTree/MergedBlockOutputStream.h b/dbms/include/DB/Storages/MergeTree/MergedBlockOutputStream.h index 50756c2c2c5..fac3b5166e2 100644 --- a/dbms/include/DB/Storages/MergeTree/MergedBlockOutputStream.h +++ b/dbms/include/DB/Storages/MergeTree/MergedBlockOutputStream.h @@ -23,9 +23,9 @@ protected: typedef std::set OffsetColumns; struct ColumnStream { - ColumnStream(const String & escaped_column_name_, const String & data_path, const std::string & marks_path) : + ColumnStream(const String & escaped_column_name_, const String & data_path, const std::string & marks_path, size_t max_compress_block_size = DBMS_DEFAULT_BUFFER_SIZE) : escaped_column_name(escaped_column_name_), - plain_file(data_path, DBMS_DEFAULT_BUFFER_SIZE, O_TRUNC | O_CREAT | O_WRONLY), + plain_file(data_path, max_compress_block_size, O_TRUNC | O_CREAT | O_WRONLY), compressed_buf(plain_file), marks_file(marks_path, 4096, O_TRUNC | O_CREAT | O_WRONLY), compressed(compressed_buf), marks(marks_file) {} @@ -82,7 +82,8 @@ protected: column_streams[size_name] = new ColumnStream( escaped_size_name, path + escaped_size_name + ".bin", - path + escaped_size_name + ".mrk"); + path + escaped_size_name + ".mrk", + storage.context.getSettings().max_compress_block_size); addStream(path, name, *type_arr->getNestedType(), level + 1); } @@ -90,7 +91,8 @@ protected: column_streams[name] = new ColumnStream( escaped_column_name, path + escaped_column_name + ".bin", - path + escaped_column_name + ".mrk"); + path + escaped_column_name + ".mrk", + storage.context.getSettings().max_compress_block_size); } @@ -129,7 +131,11 @@ protected: } type_arr->serializeOffsets(column, stream.compressed, prev_mark, limit); - stream.compressed.nextIfAtEnd(); /// Чтобы вместо засечек, указывающих на конец сжатого блока, были засечки, указывающие на начало следующего. + /// Уже могло накопиться достаточно данных для сжатия в новый блок. + if (stream.compressed.offset() > storage.context.getSettings().min_compress_block_size) + stream.compressed.next(); + else + stream.compressed.nextIfAtEnd(); /// Чтобы вместо засечек, указывающих на конец сжатого блока, были засечки, указывающие на начало следующего. prev_mark += limit; } } @@ -156,7 +162,11 @@ protected: } type.serializeBinary(column, stream.compressed, prev_mark, limit); - stream.compressed.nextIfAtEnd(); /// Чтобы вместо засечек, указывающих на конец сжатого блока, были засечки, указывающие на начало следующего. + /// Уже могло накопиться достаточно данных для сжатия в новый блок. + if (stream.compressed.offset() > storage.context.getSettings().min_compress_block_size) + stream.compressed.next(); + else + stream.compressed.nextIfAtEnd(); /// Чтобы вместо засечек, указывающих на конец сжатого блока, были засечки, указывающие на начало следующего. prev_mark += limit; } } diff --git a/dbms/include/DB/Storages/StorageLog.h b/dbms/include/DB/Storages/StorageLog.h index 0c74c712a92..c0520ff4cc6 100644 --- a/dbms/include/DB/Storages/StorageLog.h +++ b/dbms/include/DB/Storages/StorageLog.h @@ -88,8 +88,8 @@ private: struct Stream { - Stream(const std::string & data_path) : - plain(data_path, DBMS_DEFAULT_BUFFER_SIZE, O_APPEND | O_CREAT | O_WRONLY), + Stream(const std::string & data_path, size_t max_compress_block_size) : + plain(data_path, max_compress_block_size, O_APPEND | O_CREAT | O_WRONLY), compressed(plain) { plain_offset = Poco::File(data_path).getSize(); @@ -136,7 +136,7 @@ public: * (корректность имён и путей не проверяется) * состоящую из указанных столбцов; создать файлы, если их нет. */ - static StoragePtr create(const std::string & path_, const std::string & name_, NamesAndTypesListPtr columns_); + static StoragePtr create(const std::string & path_, const std::string & name_, NamesAndTypesListPtr columns_, size_t max_compress_block_size_ = DEFAULT_MAX_COMPRESS_BLOCK_SIZE); std::string getName() const { return "Log"; } std::string getTableName() const { return name; } @@ -174,7 +174,7 @@ protected: throw Exception("There is no column " + _table_column_name + " in table " + getTableName(), ErrorCodes::NO_SUCH_COLUMN_IN_TABLE); } - StorageLog(const std::string & path_, const std::string & name_, NamesAndTypesListPtr columns_); + StorageLog(const std::string & path_, const std::string & name_, NamesAndTypesListPtr columns_, size_t max_compress_block_size_); /// Прочитать файлы с засечками, если они ещё не прочитаны. /// Делается лениво, чтобы при большом количестве таблиц, сервер быстро стартовал. @@ -216,6 +216,8 @@ private: bool loaded_marks; + size_t max_compress_block_size; + /** Для обычных столбцов, в засечках указано количество строчек в блоке. * Для столбцов-массивов и вложенных структур, есть более одной группы засечек, соответствующих разным файлам: * - для внутренностей (файла name.bin) - указано суммарное количество элементов массивов в блоке, diff --git a/dbms/include/DB/Storages/StorageTinyLog.h b/dbms/include/DB/Storages/StorageTinyLog.h index 682efb70215..8c6ed56d37b 100644 --- a/dbms/include/DB/Storages/StorageTinyLog.h +++ b/dbms/include/DB/Storages/StorageTinyLog.h @@ -67,8 +67,8 @@ private: struct Stream { - Stream(const std::string & data_path) : - plain(data_path, DBMS_DEFAULT_BUFFER_SIZE, O_APPEND | O_CREAT | O_WRONLY), + Stream(const std::string & data_path, size_t max_compress_block_size) : + plain(data_path, max_compress_block_size, O_APPEND | O_CREAT | O_WRONLY), compressed(plain) { } @@ -107,7 +107,7 @@ public: * состоящую из указанных столбцов. * Если не указано attach - создать директорию, если её нет. */ - static StoragePtr create(const std::string & path_, const std::string & name_, NamesAndTypesListPtr columns_, bool attach); + static StoragePtr create(const std::string & path_, const std::string & name_, NamesAndTypesListPtr columns_, bool attach, size_t max_compress_block_size_ = DEFAULT_MAX_COMPRESS_BLOCK_SIZE); std::string getName() const { return "TinyLog"; } std::string getTableName() const { return name; } @@ -134,6 +134,8 @@ private: String name; NamesAndTypesListPtr columns; + size_t max_compress_block_size; + /// Данные столбца struct ColumnData { @@ -142,7 +144,7 @@ private: typedef std::map Files_t; Files_t files; - StorageTinyLog(const std::string & path_, const std::string & name_, NamesAndTypesListPtr columns_, bool attach); + StorageTinyLog(const std::string & path_, const std::string & name_, NamesAndTypesListPtr columns_, bool attach, size_t max_compress_block_size_); void addFile(const String & column_name, const IDataType & type, size_t level = 0); }; diff --git a/dbms/src/Interpreters/Context.cpp b/dbms/src/Interpreters/Context.cpp index 67aa10b9f62..5712e750131 100644 --- a/dbms/src/Interpreters/Context.cpp +++ b/dbms/src/Interpreters/Context.cpp @@ -503,14 +503,14 @@ ProcessList::Element * Context::getProcessListElement() } -void Context::setUncompressedCache(size_t cache_size_in_cells) +void Context::setUncompressedCache(size_t max_size_in_bytes) { Poco::ScopedLock lock(shared->mutex); if (shared->uncompressed_cache) throw Exception("Uncompressed cache has been already created.", ErrorCodes::LOGICAL_ERROR); - shared->uncompressed_cache = new UncompressedCache(cache_size_in_cells); + shared->uncompressed_cache = new UncompressedCache(max_size_in_bytes); } diff --git a/dbms/src/Storages/StorageChunks.cpp b/dbms/src/Storages/StorageChunks.cpp index 7ff9741a3f6..c3579bbff2a 100644 --- a/dbms/src/Storages/StorageChunks.cpp +++ b/dbms/src/Storages/StorageChunks.cpp @@ -137,7 +137,7 @@ StorageChunks::StorageChunks( Context & context_, bool attach) : - StorageLog(path_, name_, columns_), + StorageLog(path_, name_, columns_, context_.getSettings().max_compress_block_size), database_name(database_name_), reference_counter(path_ + escapeForFileName(name_) + "/refcount.txt"), context(context_), diff --git a/dbms/src/Storages/StorageFactory.cpp b/dbms/src/Storages/StorageFactory.cpp index 315390328b5..8e74418ddaa 100644 --- a/dbms/src/Storages/StorageFactory.cpp +++ b/dbms/src/Storages/StorageFactory.cpp @@ -68,7 +68,7 @@ StoragePtr StorageFactory::get( if (name == "Log") { - return StorageLog::create(data_path, table_name, columns); + return StorageLog::create(data_path, table_name, columns, context.getSettings().max_compress_block_size); } else if (name == "Chunks") { @@ -119,7 +119,7 @@ StoragePtr StorageFactory::get( } else if (name == "TinyLog") { - return StorageTinyLog::create(data_path, table_name, columns, attach); + return StorageTinyLog::create(data_path, table_name, columns, attach, context.getSettings().max_compress_block_size); } else if (name == "Memory") { diff --git a/dbms/src/Storages/StorageLog.cpp b/dbms/src/Storages/StorageLog.cpp index a8e77cb79f7..b1dd7ee1479 100644 --- a/dbms/src/Storages/StorageLog.cpp +++ b/dbms/src/Storages/StorageLog.cpp @@ -293,21 +293,21 @@ void LogBlockOutputStream::addStream(const String & name, const IDataType & type String size_name = DataTypeNested::extractNestedTableName(name) + ARRAY_SIZES_COLUMN_NAME_SUFFIX + toString(level); if (!streams.count(size_name)) streams.insert(std::make_pair(size_name, new Stream( - storage.files[size_name].data_file.path()))); + storage.files[size_name].data_file.path(), storage.max_compress_block_size))); addStream(name, *type_arr->getNestedType(), level + 1); } else if (const DataTypeNested * type_nested = dynamic_cast(&type)) { String size_name = name + ARRAY_SIZES_COLUMN_NAME_SUFFIX + toString(level); - streams[size_name] = new Stream(storage.files[size_name].data_file.path()); + streams[size_name] = new Stream(storage.files[size_name].data_file.path(), storage.max_compress_block_size); const NamesAndTypesList & columns = *type_nested->getNestedTypesList(); for (NamesAndTypesList::const_iterator it = columns.begin(); it != columns.end(); ++it) addStream(DataTypeNested::concatenateNestedName(name, it->first), *it->second, level + 1); } else - streams[name] = new Stream(storage.files[name].data_file.path()); + streams[name] = new Stream(storage.files[name].data_file.path(), storage.max_compress_block_size); } @@ -402,8 +402,8 @@ void LogBlockOutputStream::writeMarks(MarksForColumns marks) } -StorageLog::StorageLog(const std::string & path_, const std::string & name_, NamesAndTypesListPtr columns_) - : path(path_), name(name_), columns(columns_), loaded_marks(false) +StorageLog::StorageLog(const std::string & path_, const std::string & name_, NamesAndTypesListPtr columns_, size_t max_compress_block_size_) + : path(path_), name(name_), columns(columns_), loaded_marks(false), max_compress_block_size(max_compress_block_size_) { if (columns->empty()) throw Exception("Empty list of columns passed to StorageLog constructor", ErrorCodes::EMPTY_LIST_OF_COLUMNS_PASSED); @@ -417,9 +417,9 @@ StorageLog::StorageLog(const std::string & path_, const std::string & name_, Nam marks_file = Poco::File(path + escapeForFileName(name) + '/' + DBMS_STORAGE_LOG_MARKS_FILE_NAME); } -StoragePtr StorageLog::create(const std::string & path_, const std::string & name_, NamesAndTypesListPtr columns_) +StoragePtr StorageLog::create(const std::string & path_, const std::string & name_, NamesAndTypesListPtr columns_, size_t max_compress_block_size_) { - return (new StorageLog(path_, name_, columns_))->thisPtr(); + return (new StorageLog(path_, name_, columns_, max_compress_block_size_))->thisPtr(); } diff --git a/dbms/src/Storages/StorageTinyLog.cpp b/dbms/src/Storages/StorageTinyLog.cpp index ab179fb5492..73cf65987e0 100644 --- a/dbms/src/Storages/StorageTinyLog.cpp +++ b/dbms/src/Storages/StorageTinyLog.cpp @@ -200,21 +200,21 @@ void TinyLogBlockOutputStream::addStream(const String & name, const IDataType & { String size_name = DataTypeNested::extractNestedTableName(name) + ARRAY_SIZES_COLUMN_NAME_SUFFIX + toString(level); if (!streams.count(size_name)) - streams.insert(std::make_pair(size_name, new Stream(storage.files[size_name].data_file.path()))); + streams.insert(std::make_pair(size_name, new Stream(storage.files[size_name].data_file.path(), storage.max_compress_block_size))); addStream(name, *type_arr->getNestedType(), level + 1); } else if (const DataTypeNested * type_nested = dynamic_cast(&type)) { String size_name = name + ARRAY_SIZES_COLUMN_NAME_SUFFIX + toString(level); - streams[size_name] = new Stream(storage.files[size_name].data_file.path()); + streams[size_name] = new Stream(storage.files[size_name].data_file.path(), storage.max_compress_block_size); const NamesAndTypesList & columns = *type_nested->getNestedTypesList(); for (NamesAndTypesList::const_iterator it = columns.begin(); it != columns.end(); ++it) addStream(DataTypeNested::concatenateNestedName(name, it->first), *it->second, level + 1); } else - streams[name] = new Stream(storage.files[name].data_file.path()); + streams[name] = new Stream(storage.files[name].data_file.path(), storage.max_compress_block_size); } @@ -285,8 +285,8 @@ void TinyLogBlockOutputStream::write(const Block & block) } -StorageTinyLog::StorageTinyLog(const std::string & path_, const std::string & name_, NamesAndTypesListPtr columns_, bool attach) - : path(path_), name(name_), columns(columns_) +StorageTinyLog::StorageTinyLog(const std::string & path_, const std::string & name_, NamesAndTypesListPtr columns_, bool attach, size_t max_compress_block_size_) + : path(path_), name(name_), columns(columns_), max_compress_block_size(max_compress_block_size_) { if (columns->empty()) throw Exception("Empty list of columns passed to StorageTinyLog constructor", ErrorCodes::EMPTY_LIST_OF_COLUMNS_PASSED); @@ -303,9 +303,9 @@ StorageTinyLog::StorageTinyLog(const std::string & path_, const std::string & na addFile(it->first, *it->second); } -StoragePtr StorageTinyLog::create(const std::string & path_, const std::string & name_, NamesAndTypesListPtr columns_, bool attach) +StoragePtr StorageTinyLog::create(const std::string & path_, const std::string & name_, NamesAndTypesListPtr columns_, bool attach, size_t max_compress_block_size_) { - return (new StorageTinyLog(path_, name_, columns_, attach))->thisPtr(); + return (new StorageTinyLog(path_, name_, columns_, attach, max_compress_block_size_))->thisPtr(); }