This commit is contained in:
Sergey Fedorov 2014-03-28 18:36:24 +04:00
parent fd6cf4974d
commit 6395e841f9
12 changed files with 66 additions and 38 deletions

View File

@ -13,6 +13,8 @@
#define DBMS_DEFAULT_RECEIVE_TIMEOUT_SEC 300 #define DBMS_DEFAULT_RECEIVE_TIMEOUT_SEC 300
#define DBMS_DEFAULT_POLL_INTERVAL 10 #define DBMS_DEFAULT_POLL_INTERVAL 10
#define DEFAULT_MIN_COMPRESS_BLOCK_SIZE 65536
#define DEFAULT_MAX_COMPRESS_BLOCK_SIZE 1048576
/// Какими блоками по-умолчанию читаются и пишутся данные (в числе строк). /// Какими блоками по-умолчанию читаются и пишутся данные (в числе строк).
#define DEFAULT_BLOCK_SIZE 1048576 #define DEFAULT_BLOCK_SIZE 1048576
/// То же самое, но для операций слияния. Меньше DEFAULT_BLOCK_SIZE для экономии оперативки (так как читаются все столбцы). /// То же самое, но для операций слияния. Меньше DEFAULT_BLOCK_SIZE для экономии оперативки (так как читаются все столбцы).

View File

@ -17,17 +17,25 @@ struct UncompressedCacheCell
size_t compressed_size; size_t compressed_size;
}; };
struct UncompressedSizeWeightFunction
{
size_t operator()(const UncompressedCacheCell & x) const
{
return x.data.size();
}
};
/** Кэш разжатых блоков для CachedCompressedReadBuffer. thread-safe. /** Кэш разжатых блоков для CachedCompressedReadBuffer. thread-safe.
*/ */
class UncompressedCache : public LRUCache<UInt128, UncompressedCacheCell, UInt128TrivialHash> class UncompressedCache : public LRUCache<UInt128, UncompressedCacheCell, UInt128TrivialHash, UncompressedSizeWeightFunction>
{ {
private: private:
typedef LRUCache<UInt128, UncompressedCacheCell, UInt128TrivialHash> Base; typedef LRUCache<UInt128, UncompressedCacheCell, UInt128TrivialHash, UncompressedSizeWeightFunction> Base;
public: public:
UncompressedCache(size_t max_size_in_cells) UncompressedCache(size_t max_size_in_bytes)
: Base(max_size_in_cells) {} : Base(max_size_in_bytes) {}
/// Посчитать ключ от пути к файлу и смещения. /// Посчитать ключ от пути к файлу и смещения.
static UInt128 hash(const String & path_to_file, size_t offset) static UInt128 hash(const String & path_to_file, size_t offset)

View File

@ -288,7 +288,7 @@ public:
const ProcessList & getProcessList() const { return shared->process_list; } const ProcessList & getProcessList() const { return shared->process_list; }
/// Создать кэш разжатых блоков указанного размера. Это можно сделать только один раз. /// Создать кэш разжатых блоков указанного размера. Это можно сделать только один раз.
void setUncompressedCache(size_t cache_size_in_cells); void setUncompressedCache(size_t max_size_in_bytes);
UncompressedCachePtr getUncompressedCache() const; UncompressedCachePtr getUncompressedCache() const;
/// Создать кэш засечек указанного размера. Это можно сделать только один раз. /// Создать кэш засечек указанного размера. Это можно сделать только один раз.

View File

@ -24,6 +24,10 @@ struct Settings
*/ */
#define APPLY_FOR_SETTINGS(M) \ #define APPLY_FOR_SETTINGS(M) \
/** Минимальный размер блока, готового для сжатия */ \
M(SettingUInt64, min_compress_block_size, DEFAULT_MIN_COMPRESS_BLOCK_SIZE) \
/** Максимальный размер блока, пригодного для сжатия */ \
M(SettingUInt64, max_compress_block_size, DEFAULT_MAX_COMPRESS_BLOCK_SIZE) \
/** Максимальный размер блока для чтения */ \ /** Максимальный размер блока для чтения */ \
M(SettingUInt64, max_block_size, DEFAULT_BLOCK_SIZE) \ M(SettingUInt64, max_block_size, DEFAULT_BLOCK_SIZE) \
/** Максимальное количество потоков выполнения запроса */ \ /** Максимальное количество потоков выполнения запроса */ \

View File

@ -23,9 +23,9 @@ protected:
typedef std::set<std::string> OffsetColumns; typedef std::set<std::string> OffsetColumns;
struct ColumnStream struct ColumnStream
{ {
ColumnStream(const String & escaped_column_name_, const String & data_path, const std::string & marks_path) : ColumnStream(const String & escaped_column_name_, const String & data_path, const std::string & marks_path, size_t max_compress_block_size = DBMS_DEFAULT_BUFFER_SIZE) :
escaped_column_name(escaped_column_name_), escaped_column_name(escaped_column_name_),
plain_file(data_path, DBMS_DEFAULT_BUFFER_SIZE, O_TRUNC | O_CREAT | O_WRONLY), plain_file(data_path, max_compress_block_size, O_TRUNC | O_CREAT | O_WRONLY),
compressed_buf(plain_file), compressed_buf(plain_file),
marks_file(marks_path, 4096, O_TRUNC | O_CREAT | O_WRONLY), marks_file(marks_path, 4096, O_TRUNC | O_CREAT | O_WRONLY),
compressed(compressed_buf), marks(marks_file) {} compressed(compressed_buf), marks(marks_file) {}
@ -82,7 +82,8 @@ protected:
column_streams[size_name] = new ColumnStream( column_streams[size_name] = new ColumnStream(
escaped_size_name, escaped_size_name,
path + escaped_size_name + ".bin", path + escaped_size_name + ".bin",
path + escaped_size_name + ".mrk"); path + escaped_size_name + ".mrk",
storage.context.getSettings().max_compress_block_size);
addStream(path, name, *type_arr->getNestedType(), level + 1); addStream(path, name, *type_arr->getNestedType(), level + 1);
} }
@ -90,7 +91,8 @@ protected:
column_streams[name] = new ColumnStream( column_streams[name] = new ColumnStream(
escaped_column_name, escaped_column_name,
path + escaped_column_name + ".bin", path + escaped_column_name + ".bin",
path + escaped_column_name + ".mrk"); path + escaped_column_name + ".mrk",
storage.context.getSettings().max_compress_block_size);
} }
@ -129,7 +131,11 @@ protected:
} }
type_arr->serializeOffsets(column, stream.compressed, prev_mark, limit); type_arr->serializeOffsets(column, stream.compressed, prev_mark, limit);
stream.compressed.nextIfAtEnd(); /// Чтобы вместо засечек, указывающих на конец сжатого блока, были засечки, указывающие на начало следующего. /// Уже могло накопиться достаточно данных для сжатия в новый блок.
if (stream.compressed.offset() > storage.context.getSettings().min_compress_block_size)
stream.compressed.next();
else
stream.compressed.nextIfAtEnd(); /// Чтобы вместо засечек, указывающих на конец сжатого блока, были засечки, указывающие на начало следующего.
prev_mark += limit; prev_mark += limit;
} }
} }
@ -156,7 +162,11 @@ protected:
} }
type.serializeBinary(column, stream.compressed, prev_mark, limit); type.serializeBinary(column, stream.compressed, prev_mark, limit);
stream.compressed.nextIfAtEnd(); /// Чтобы вместо засечек, указывающих на конец сжатого блока, были засечки, указывающие на начало следующего. /// Уже могло накопиться достаточно данных для сжатия в новый блок.
if (stream.compressed.offset() > storage.context.getSettings().min_compress_block_size)
stream.compressed.next();
else
stream.compressed.nextIfAtEnd(); /// Чтобы вместо засечек, указывающих на конец сжатого блока, были засечки, указывающие на начало следующего.
prev_mark += limit; prev_mark += limit;
} }
} }

View File

@ -88,8 +88,8 @@ private:
struct Stream struct Stream
{ {
Stream(const std::string & data_path) : Stream(const std::string & data_path, size_t max_compress_block_size) :
plain(data_path, DBMS_DEFAULT_BUFFER_SIZE, O_APPEND | O_CREAT | O_WRONLY), plain(data_path, max_compress_block_size, O_APPEND | O_CREAT | O_WRONLY),
compressed(plain) compressed(plain)
{ {
plain_offset = Poco::File(data_path).getSize(); plain_offset = Poco::File(data_path).getSize();
@ -136,7 +136,7 @@ public:
* (корректность имён и путей не проверяется) * (корректность имён и путей не проверяется)
* состоящую из указанных столбцов; создать файлы, если их нет. * состоящую из указанных столбцов; создать файлы, если их нет.
*/ */
static StoragePtr create(const std::string & path_, const std::string & name_, NamesAndTypesListPtr columns_); static StoragePtr create(const std::string & path_, const std::string & name_, NamesAndTypesListPtr columns_, size_t max_compress_block_size_ = DEFAULT_MAX_COMPRESS_BLOCK_SIZE);
std::string getName() const { return "Log"; } std::string getName() const { return "Log"; }
std::string getTableName() const { return name; } std::string getTableName() const { return name; }
@ -174,7 +174,7 @@ protected:
throw Exception("There is no column " + _table_column_name + " in table " + getTableName(), ErrorCodes::NO_SUCH_COLUMN_IN_TABLE); throw Exception("There is no column " + _table_column_name + " in table " + getTableName(), ErrorCodes::NO_SUCH_COLUMN_IN_TABLE);
} }
StorageLog(const std::string & path_, const std::string & name_, NamesAndTypesListPtr columns_); StorageLog(const std::string & path_, const std::string & name_, NamesAndTypesListPtr columns_, size_t max_compress_block_size_);
/// Прочитать файлы с засечками, если они ещё не прочитаны. /// Прочитать файлы с засечками, если они ещё не прочитаны.
/// Делается лениво, чтобы при большом количестве таблиц, сервер быстро стартовал. /// Делается лениво, чтобы при большом количестве таблиц, сервер быстро стартовал.
@ -216,6 +216,8 @@ private:
bool loaded_marks; bool loaded_marks;
size_t max_compress_block_size;
/** Для обычных столбцов, в засечках указано количество строчек в блоке. /** Для обычных столбцов, в засечках указано количество строчек в блоке.
* Для столбцов-массивов и вложенных структур, есть более одной группы засечек, соответствующих разным файлам: * Для столбцов-массивов и вложенных структур, есть более одной группы засечек, соответствующих разным файлам:
* - для внутренностей (файла name.bin) - указано суммарное количество элементов массивов в блоке, * - для внутренностей (файла name.bin) - указано суммарное количество элементов массивов в блоке,

View File

@ -67,8 +67,8 @@ private:
struct Stream struct Stream
{ {
Stream(const std::string & data_path) : Stream(const std::string & data_path, size_t max_compress_block_size) :
plain(data_path, DBMS_DEFAULT_BUFFER_SIZE, O_APPEND | O_CREAT | O_WRONLY), plain(data_path, max_compress_block_size, O_APPEND | O_CREAT | O_WRONLY),
compressed(plain) compressed(plain)
{ {
} }
@ -107,7 +107,7 @@ public:
* состоящую из указанных столбцов. * состоящую из указанных столбцов.
* Если не указано attach - создать директорию, если её нет. * Если не указано attach - создать директорию, если её нет.
*/ */
static StoragePtr create(const std::string & path_, const std::string & name_, NamesAndTypesListPtr columns_, bool attach); static StoragePtr create(const std::string & path_, const std::string & name_, NamesAndTypesListPtr columns_, bool attach, size_t max_compress_block_size_ = DEFAULT_MAX_COMPRESS_BLOCK_SIZE);
std::string getName() const { return "TinyLog"; } std::string getName() const { return "TinyLog"; }
std::string getTableName() const { return name; } std::string getTableName() const { return name; }
@ -134,6 +134,8 @@ private:
String name; String name;
NamesAndTypesListPtr columns; NamesAndTypesListPtr columns;
size_t max_compress_block_size;
/// Данные столбца /// Данные столбца
struct ColumnData struct ColumnData
{ {
@ -142,7 +144,7 @@ private:
typedef std::map<String, ColumnData> Files_t; typedef std::map<String, ColumnData> Files_t;
Files_t files; Files_t files;
StorageTinyLog(const std::string & path_, const std::string & name_, NamesAndTypesListPtr columns_, bool attach); StorageTinyLog(const std::string & path_, const std::string & name_, NamesAndTypesListPtr columns_, bool attach, size_t max_compress_block_size_);
void addFile(const String & column_name, const IDataType & type, size_t level = 0); void addFile(const String & column_name, const IDataType & type, size_t level = 0);
}; };

View File

@ -503,14 +503,14 @@ ProcessList::Element * Context::getProcessListElement()
} }
void Context::setUncompressedCache(size_t cache_size_in_cells) void Context::setUncompressedCache(size_t max_size_in_bytes)
{ {
Poco::ScopedLock<Poco::Mutex> lock(shared->mutex); Poco::ScopedLock<Poco::Mutex> lock(shared->mutex);
if (shared->uncompressed_cache) if (shared->uncompressed_cache)
throw Exception("Uncompressed cache has been already created.", ErrorCodes::LOGICAL_ERROR); throw Exception("Uncompressed cache has been already created.", ErrorCodes::LOGICAL_ERROR);
shared->uncompressed_cache = new UncompressedCache(cache_size_in_cells); shared->uncompressed_cache = new UncompressedCache(max_size_in_bytes);
} }

View File

@ -137,7 +137,7 @@ StorageChunks::StorageChunks(
Context & context_, Context & context_,
bool attach) bool attach)
: :
StorageLog(path_, name_, columns_), StorageLog(path_, name_, columns_, context_.getSettings().max_compress_block_size),
database_name(database_name_), database_name(database_name_),
reference_counter(path_ + escapeForFileName(name_) + "/refcount.txt"), reference_counter(path_ + escapeForFileName(name_) + "/refcount.txt"),
context(context_), context(context_),

View File

@ -68,7 +68,7 @@ StoragePtr StorageFactory::get(
if (name == "Log") if (name == "Log")
{ {
return StorageLog::create(data_path, table_name, columns); return StorageLog::create(data_path, table_name, columns, context.getSettings().max_compress_block_size);
} }
else if (name == "Chunks") else if (name == "Chunks")
{ {
@ -119,7 +119,7 @@ StoragePtr StorageFactory::get(
} }
else if (name == "TinyLog") else if (name == "TinyLog")
{ {
return StorageTinyLog::create(data_path, table_name, columns, attach); return StorageTinyLog::create(data_path, table_name, columns, attach, context.getSettings().max_compress_block_size);
} }
else if (name == "Memory") else if (name == "Memory")
{ {

View File

@ -293,21 +293,21 @@ void LogBlockOutputStream::addStream(const String & name, const IDataType & type
String size_name = DataTypeNested::extractNestedTableName(name) + ARRAY_SIZES_COLUMN_NAME_SUFFIX + toString(level); String size_name = DataTypeNested::extractNestedTableName(name) + ARRAY_SIZES_COLUMN_NAME_SUFFIX + toString(level);
if (!streams.count(size_name)) if (!streams.count(size_name))
streams.insert(std::make_pair(size_name, new Stream( streams.insert(std::make_pair(size_name, new Stream(
storage.files[size_name].data_file.path()))); storage.files[size_name].data_file.path(), storage.max_compress_block_size)));
addStream(name, *type_arr->getNestedType(), level + 1); addStream(name, *type_arr->getNestedType(), level + 1);
} }
else if (const DataTypeNested * type_nested = dynamic_cast<const DataTypeNested *>(&type)) else if (const DataTypeNested * type_nested = dynamic_cast<const DataTypeNested *>(&type))
{ {
String size_name = name + ARRAY_SIZES_COLUMN_NAME_SUFFIX + toString(level); String size_name = name + ARRAY_SIZES_COLUMN_NAME_SUFFIX + toString(level);
streams[size_name] = new Stream(storage.files[size_name].data_file.path()); streams[size_name] = new Stream(storage.files[size_name].data_file.path(), storage.max_compress_block_size);
const NamesAndTypesList & columns = *type_nested->getNestedTypesList(); const NamesAndTypesList & columns = *type_nested->getNestedTypesList();
for (NamesAndTypesList::const_iterator it = columns.begin(); it != columns.end(); ++it) for (NamesAndTypesList::const_iterator it = columns.begin(); it != columns.end(); ++it)
addStream(DataTypeNested::concatenateNestedName(name, it->first), *it->second, level + 1); addStream(DataTypeNested::concatenateNestedName(name, it->first), *it->second, level + 1);
} }
else else
streams[name] = new Stream(storage.files[name].data_file.path()); streams[name] = new Stream(storage.files[name].data_file.path(), storage.max_compress_block_size);
} }
@ -402,8 +402,8 @@ void LogBlockOutputStream::writeMarks(MarksForColumns marks)
} }
StorageLog::StorageLog(const std::string & path_, const std::string & name_, NamesAndTypesListPtr columns_) StorageLog::StorageLog(const std::string & path_, const std::string & name_, NamesAndTypesListPtr columns_, size_t max_compress_block_size_)
: path(path_), name(name_), columns(columns_), loaded_marks(false) : path(path_), name(name_), columns(columns_), loaded_marks(false), max_compress_block_size(max_compress_block_size_)
{ {
if (columns->empty()) if (columns->empty())
throw Exception("Empty list of columns passed to StorageLog constructor", ErrorCodes::EMPTY_LIST_OF_COLUMNS_PASSED); throw Exception("Empty list of columns passed to StorageLog constructor", ErrorCodes::EMPTY_LIST_OF_COLUMNS_PASSED);
@ -417,9 +417,9 @@ StorageLog::StorageLog(const std::string & path_, const std::string & name_, Nam
marks_file = Poco::File(path + escapeForFileName(name) + '/' + DBMS_STORAGE_LOG_MARKS_FILE_NAME); marks_file = Poco::File(path + escapeForFileName(name) + '/' + DBMS_STORAGE_LOG_MARKS_FILE_NAME);
} }
StoragePtr StorageLog::create(const std::string & path_, const std::string & name_, NamesAndTypesListPtr columns_) StoragePtr StorageLog::create(const std::string & path_, const std::string & name_, NamesAndTypesListPtr columns_, size_t max_compress_block_size_)
{ {
return (new StorageLog(path_, name_, columns_))->thisPtr(); return (new StorageLog(path_, name_, columns_, max_compress_block_size_))->thisPtr();
} }

View File

@ -200,21 +200,21 @@ void TinyLogBlockOutputStream::addStream(const String & name, const IDataType &
{ {
String size_name = DataTypeNested::extractNestedTableName(name) + ARRAY_SIZES_COLUMN_NAME_SUFFIX + toString(level); String size_name = DataTypeNested::extractNestedTableName(name) + ARRAY_SIZES_COLUMN_NAME_SUFFIX + toString(level);
if (!streams.count(size_name)) if (!streams.count(size_name))
streams.insert(std::make_pair(size_name, new Stream(storage.files[size_name].data_file.path()))); streams.insert(std::make_pair(size_name, new Stream(storage.files[size_name].data_file.path(), storage.max_compress_block_size)));
addStream(name, *type_arr->getNestedType(), level + 1); addStream(name, *type_arr->getNestedType(), level + 1);
} }
else if (const DataTypeNested * type_nested = dynamic_cast<const DataTypeNested *>(&type)) else if (const DataTypeNested * type_nested = dynamic_cast<const DataTypeNested *>(&type))
{ {
String size_name = name + ARRAY_SIZES_COLUMN_NAME_SUFFIX + toString(level); String size_name = name + ARRAY_SIZES_COLUMN_NAME_SUFFIX + toString(level);
streams[size_name] = new Stream(storage.files[size_name].data_file.path()); streams[size_name] = new Stream(storage.files[size_name].data_file.path(), storage.max_compress_block_size);
const NamesAndTypesList & columns = *type_nested->getNestedTypesList(); const NamesAndTypesList & columns = *type_nested->getNestedTypesList();
for (NamesAndTypesList::const_iterator it = columns.begin(); it != columns.end(); ++it) for (NamesAndTypesList::const_iterator it = columns.begin(); it != columns.end(); ++it)
addStream(DataTypeNested::concatenateNestedName(name, it->first), *it->second, level + 1); addStream(DataTypeNested::concatenateNestedName(name, it->first), *it->second, level + 1);
} }
else else
streams[name] = new Stream(storage.files[name].data_file.path()); streams[name] = new Stream(storage.files[name].data_file.path(), storage.max_compress_block_size);
} }
@ -285,8 +285,8 @@ void TinyLogBlockOutputStream::write(const Block & block)
} }
StorageTinyLog::StorageTinyLog(const std::string & path_, const std::string & name_, NamesAndTypesListPtr columns_, bool attach) StorageTinyLog::StorageTinyLog(const std::string & path_, const std::string & name_, NamesAndTypesListPtr columns_, bool attach, size_t max_compress_block_size_)
: path(path_), name(name_), columns(columns_) : path(path_), name(name_), columns(columns_), max_compress_block_size(max_compress_block_size_)
{ {
if (columns->empty()) if (columns->empty())
throw Exception("Empty list of columns passed to StorageTinyLog constructor", ErrorCodes::EMPTY_LIST_OF_COLUMNS_PASSED); throw Exception("Empty list of columns passed to StorageTinyLog constructor", ErrorCodes::EMPTY_LIST_OF_COLUMNS_PASSED);
@ -303,9 +303,9 @@ StorageTinyLog::StorageTinyLog(const std::string & path_, const std::string & na
addFile(it->first, *it->second); addFile(it->first, *it->second);
} }
StoragePtr StorageTinyLog::create(const std::string & path_, const std::string & name_, NamesAndTypesListPtr columns_, bool attach) StoragePtr StorageTinyLog::create(const std::string & path_, const std::string & name_, NamesAndTypesListPtr columns_, bool attach, size_t max_compress_block_size_)
{ {
return (new StorageTinyLog(path_, name_, columns_, attach))->thisPtr(); return (new StorageTinyLog(path_, name_, columns_, attach, max_compress_block_size_))->thisPtr();
} }