dbms: added uncompressed cache (experimental) [#CONV-8661].

This commit is contained in:
Alexey Milovidov 2013-09-08 05:53:10 +00:00
parent 1f06b7a95c
commit efae271ae0
14 changed files with 412 additions and 94 deletions

View File

@ -9,52 +9,49 @@ namespace DB
{
namespace detail
/** Замена std::vector<char> для использования в буферах.
* Отличается тем, что не делает лишний memset. (И почти ничего не делает.)
*/
struct Memory : boost::noncopyable
{
/** Замена std::vector<char> для использования в буферах.
* Отличается тем, что не делает лишний memset. (И почти ничего не делает.)
*/
struct Memory : boost::noncopyable
size_t m_capacity;
size_t m_size;
char * m_data;
Memory() : m_capacity(0), m_size(0), m_data(NULL) {}
Memory(size_t size_) : m_capacity(size_), m_size(m_capacity), m_data(size_ ? new char[m_capacity] : NULL) {}
~Memory()
{
size_t m_capacity;
size_t m_size;
char * m_data;
if (m_data)
{
delete[] m_data;
m_data = NULL;
}
}
Memory() : m_capacity(0), m_size(0), m_data(NULL) {}
Memory(size_t size_) : m_capacity(size_), m_size(m_capacity), m_data(size_ ? new char[m_capacity] : NULL) {}
size_t size() const { return m_size; }
const char & operator[](size_t i) const { return m_data[i]; }
char & operator[](size_t i) { return m_data[i]; }
~Memory()
void resize(size_t new_size)
{
if (new_size < m_capacity)
{
m_size = new_size;
return;
}
else
{
if (m_data)
{
delete[] m_data;
m_data = NULL;
}
m_capacity = new_size;
m_size = m_capacity;
m_data = new char[m_capacity];
}
size_t size() const { return m_size; }
const char & operator[](size_t i) const { return m_data[i]; }
char & operator[](size_t i) { return m_data[i]; }
void resize(size_t new_size)
{
if (new_size < m_capacity)
{
m_size = new_size;
return;
}
else
{
if (m_data)
delete[] m_data;
m_capacity = new_size;
m_size = m_capacity;
m_data = new char[m_capacity];
}
}
};
}
}
};
/** Буфер, который может сам владеть своим куском памяти для работы.
@ -64,7 +61,7 @@ template <typename Base>
class BufferWithOwnMemory : public Base
{
protected:
detail::Memory memory;
Memory memory;
public:
/// Если передать не-NULL existing_memory, то буфер не будет создавать свой кусок памяти, а будет использовать существующий (и не будет им владеть).
BufferWithOwnMemory(size_t size = DBMS_DEFAULT_BUFFER_SIZE, char * existing_memory = NULL) : Base(NULL, 0), memory(existing_memory ? 0 : size)

View File

@ -0,0 +1,96 @@
#pragma once
#include <vector>
#include <DB/IO/ReadBufferFromFile.h>
#include <DB/IO/CompressedReadBuffer.h>
#include <DB/IO/UncompressedCache.h>
namespace DB
{
/** Буфер для чтения из сжатого файла с использованием кэша разжатых блоков.
* Кэш внешний - передаётся в качестве аргумента в конструктор.
* Позволяет увеличить производительность в случае, когда часто читаются одни и те же блоки.
* Недостатки:
* - в случае, если нужно читать много данных подряд, но из них только часть закэширована, приходится делать seek-и.
*/
class CachedCompressedReadBuffer : public ReadBuffer
{
private:
const std::string path;
size_t cur_offset; /// Смещение в сжатом файле.
UncompressedCache & cache;
size_t buf_size;
/// SharedPtr - для ленивой инициализации (только в случае кэш-промаха).
Poco::SharedPtr<ReadBufferFromFile> in;
Poco::SharedPtr<CompressedReadBuffer> compressed_in;
/// Кусок данных из кэша, или кусок считанных данных, который мы положим в кэш.
UncompressedCache::CellPtr owned_cell;
/// Нужно ли делать seek - при кэш-промахе после кэш-попадания.
bool need_seek;
bool nextImpl()
{
/// Проверим наличие разжатого блока в кэше, захватим владение этим блоком, если он есть.
UInt128 key = cache.hash(path, cur_offset);
owned_cell = cache.get(key);
if (!owned_cell)
{
/// Если нет - надо прочитать его из файла.
if (!compressed_in)
{
in = new ReadBufferFromFile(path, buf_size);
compressed_in = new CompressedReadBuffer(*in);
}
if (need_seek)
{
in->seek(cur_offset);
need_seek = false;
}
owned_cell = new UncompressedCache::Cell;
owned_cell->key = key;
/// Разжимать будем в кусок памяти, который будет в кэше.
compressed_in->setMemory(owned_cell->data);
size_t old_count = in->count();
compressed_in->next();
owned_cell->compressed_size = in->count() - old_count;
/// Положим данные в кэш.
cache.set(owned_cell);
}
else
{
need_seek = true;
}
if (owned_cell->data.m_size == 0)
return false;
internal_buffer = Buffer(owned_cell->data.m_data, owned_cell->data.m_data + owned_cell->data.m_size);
working_buffer = Buffer(owned_cell->data.m_data, owned_cell->data.m_data + owned_cell->data.m_size);
pos = working_buffer.begin();
cur_offset += owned_cell->compressed_size;
return true;
}
public:
CachedCompressedReadBuffer(const std::string & path_, size_t offset_, UncompressedCache & cache_, size_t buf_size_ = DBMS_DEFAULT_BUFFER_SIZE)
: ReadBuffer(NULL, 0), path(path_), cur_offset(offset_), cache(cache_), buf_size(buf_size_), need_seek(cur_offset != 0)
{
}
};
}

View File

@ -24,6 +24,12 @@ private:
std::vector<char> compressed_buffer;
qlz_state_decompress * qlz_state;
/** Указатель на кусок памяти, куда будут разжиматься блоки.
* Это может быть либо свой кусок памяти из BufferWithOwnMemory (по-умолчанию),
* либо пользователь может попросить разжимать данные в свой кусок памяти (метод setMemory).
*/
Memory * maybe_own_memory;
/// Прочитать сжатые данные в compressed_buffer. Достать из их заголовка размер разжатых данных. Проверить чексумму.
bool readCompressedData(size_t & size_decompressed)
@ -72,9 +78,9 @@ private:
if (!readCompressedData(size_decompressed))
return false;
memory.resize(size_decompressed);
internal_buffer = Buffer(&memory[0], &memory[size_decompressed]);
working_buffer = Buffer(&memory[0], &memory[size_decompressed]);
maybe_own_memory->resize(size_decompressed);
internal_buffer = Buffer(&(*maybe_own_memory)[0], &(*maybe_own_memory)[size_decompressed]);
working_buffer = Buffer(&(*maybe_own_memory)[0], &(*maybe_own_memory)[size_decompressed]);
decompress(working_buffer.begin(), size_decompressed);
@ -86,7 +92,8 @@ public:
: BufferWithOwnMemory<ReadBuffer>(0),
in(in_),
compressed_buffer(QUICKLZ_HEADER_SIZE),
qlz_state(NULL)
qlz_state(NULL),
maybe_own_memory(&memory)
{
}
@ -97,6 +104,13 @@ public:
}
/// Использовать предоставленный пользователем кусок памяти для разжатия. (Для реализации кэша разжатых блоков.)
void setMemory(Memory & memory_)
{
maybe_own_memory = &memory_;
}
size_t readBig(char * to, size_t n)
{
size_t bytes_read = 0;
@ -129,9 +143,9 @@ public:
}
else
{
memory.resize(size_decompressed);
internal_buffer = Buffer(&memory[0], &memory[size_decompressed]);
working_buffer = Buffer(&memory[0], &memory[size_decompressed]);
maybe_own_memory->resize(size_decompressed);
internal_buffer = Buffer(&(*maybe_own_memory)[0], &(*maybe_own_memory)[size_decompressed]);
working_buffer = Buffer(&(*maybe_own_memory)[0], &(*maybe_own_memory)[size_decompressed]);
pos = working_buffer.begin();
decompress(working_buffer.begin(), size_decompressed);

View File

@ -62,6 +62,7 @@ public:
off_t seek(off_t offset, int whence = SEEK_SET)
{
pos = working_buffer.end();
off_t res = lseek(fd, offset, whence);
if (-1 == res)
throwFromErrno("Cannot seek through file " + getFileName(), ErrorCodes::CANNOT_SEEK_THROUGH_FILE);

View File

@ -0,0 +1,101 @@
#pragma once
#include <vector>
#include <Poco/SharedPtr.h>
#include <Poco/Mutex.h>
#include <DB/Common/SipHash.h>
#include <DB/IO/BufferWithOwnMemory.h>
#include <DB/Interpreters/AggregationCommon.h>
namespace DB
{
/** Кэш разжатых блоков для CachedCompressedReadBuffer. thread-safe.
* NOTE Использовать LRU вместо простой кэш-таблицы.
*/
class UncompressedCache
{
public:
struct Cell
{
UInt128 key;
Memory data;
size_t compressed_size;
Cell() { key.first = 0; key.second = 0; compressed_size = 0; }
};
/// В ячейках кэш-таблицы лежат SharedPtr-ы на разжатые блоки. Это нужно, чтобы можно было достать ячейку, захватив владение ею.
typedef Poco::SharedPtr<Cell> CellPtr;
typedef std::vector<CellPtr> Cells;
private:
size_t num_cells;
Cells cells;
mutable Poco::FastMutex mutex;
mutable size_t hits;
mutable size_t misses;
public:
UncompressedCache(size_t num_cells_)
: num_cells(num_cells_), cells(num_cells), hits(0), misses(0)
{
}
/// Посчитать ключ от пути к файлу и смещения.
static UInt128 hash(const String & path_to_file, size_t offset)
{
UInt128 key;
SipHash hash;
hash.update(path_to_file.data(), path_to_file.size() + 1);
hash.update(reinterpret_cast<const char *>(&offset), sizeof(offset));
hash.get128(key.first, key.second);
return key;
}
CellPtr get(UInt128 key) const
{
Poco::ScopedLock<Poco::FastMutex> lock(mutex);
CellPtr cell = cells[key.first % num_cells];
if (cell && cell->key == key)
{
++hits;
return cell;
}
else
{
++misses;
return NULL;
}
}
void set(const CellPtr & new_cell)
{
Poco::ScopedLock<Poco::FastMutex> lock(mutex);
CellPtr & cell = cells[new_cell->key.first % num_cells];
if (!cell || cell->key != new_cell->key)
cell = new_cell;
}
void getStats(size_t & out_hits, size_t & out_misses) const volatile
{
/// Синхронизация не нужна.
out_hits = hits;
out_misses = misses;
}
};
typedef Poco::SharedPtr<UncompressedCache> UncompressedCachePtr;
}

View File

@ -9,6 +9,7 @@
#include <Yandex/logger_useful.h>
#include <DB/Core/NamesAndTypes.h>
#include <DB/IO/UncompressedCache.h>
#include <DB/DataStreams/FormatFactory.h>
#include <DB/Storages/IStorage.h>
#include <DB/Functions/FunctionFactory.h>
@ -56,6 +57,7 @@ struct ContextShared
Users users; /// Известные пользователи.
Quotas quotas; /// Известные квоты на использование ресурсов.
ProcessList process_list; /// Исполняющиеся в данный момент запросы.
mutable UncompressedCachePtr uncompressed_cache; /// Кэш разжатых блоков.
Logger * log; /// Логгер.
ContextShared() : log(&Logger::get("Context")) {};
@ -169,6 +171,10 @@ public:
ProcessList & getProcessList() { return shared->process_list; }
const ProcessList & getProcessList() const { return shared->process_list; }
/// Создать кэш разжатых блоков указанного размера. Это можно сделать только один раз.
void setUncompressedCache(size_t cache_size_in_cells);
UncompressedCachePtr getUncompressedCache() const;
};

View File

@ -43,6 +43,8 @@ struct Settings
bool sign_rewrite;
/// Считать минимумы и максимумы столбцов результата. Они могут выводиться в JSON-форматах.
bool extremes;
/// Использовать ли кэш разжатых блоков.
bool use_uncompressed_cache;
/// Всевозможные ограничения на выполнение запроса.
Limits limits;
@ -62,7 +64,7 @@ struct Settings
poll_interval(DBMS_DEFAULT_POLL_INTERVAL),
distributed_connections_pool_size(DBMS_DEFAULT_DISTRIBUTED_CONNECTIONS_POOL_SIZE),
connections_with_failover_max_tries(DBMS_CONNECTION_POOL_WITH_FAILOVER_DEFAULT_MAX_TRIES),
sign_rewrite(false), extremes(false)
sign_rewrite(false), extremes(false), use_uncompressed_cache(true)
{
}

View File

@ -4,6 +4,8 @@
#include <DB/Storages/StorageMergeTree.h>
#include <DB/Storages/MergeTree/PKCondition.h>
#include <DB/IO/CachedCompressedReadBuffer.h>
#define MERGE_TREE_MARK_SIZE (2 * sizeof(size_t))
@ -17,12 +19,14 @@ public:
/// Параметры storage_ и owned_storage разделены, чтобы можно было сделать поток, не владеющий своим storage
/// (например, поток, сливаящий куски). В таком случае сам storage должен следить, чтобы не удалить данные, пока их читают.
MergeTreeBlockInputStream(const String & path_, /// Путь к куску
size_t block_size_, const Names & column_names_,
StorageMergeTree & storage_, const StorageMergeTree::DataPartPtr & owned_data_part_,
const MarkRanges & mark_ranges_, StoragePtr owned_storage)
: IProfilingBlockInputStream(owned_storage), path(path_), block_size(block_size_), column_names(column_names_),
storage(storage_), owned_data_part(owned_data_part_),
mark_ranges(mark_ranges_), current_range(-1), rows_left_in_current_range(0)
size_t block_size_, const Names & column_names_,
StorageMergeTree & storage_, const StorageMergeTree::DataPartPtr & owned_data_part_,
const MarkRanges & mark_ranges_, StoragePtr owned_storage, bool use_uncompressed_cache_)
: IProfilingBlockInputStream(owned_storage),
path(path_), block_size(block_size_), column_names(column_names_),
storage(storage_), owned_data_part(owned_data_part_),
mark_ranges(mark_ranges_), use_uncompressed_cache(use_uncompressed_cache_),
current_range(-1), rows_left_in_current_range(0)
{
LOG_TRACE(storage.log, "Reading " << mark_ranges.size() << " ranges from part " << owned_data_part->name
<< ", up to " << (mark_ranges.back().end - mark_ranges.front().begin) * storage.index_granularity
@ -275,36 +279,50 @@ private:
StorageMergeTree & storage;
const StorageMergeTree::DataPartPtr owned_data_part; /// Кусок не будет удалён, пока им владеет этот объект.
MarkRanges mark_ranges; /// В каких диапазонах засечек читать.
bool use_uncompressed_cache;
int current_range; /// Какой из mark_ranges сейчас читаем.
size_t rows_left_in_current_range; /// Сколько строк уже прочитали из текущего элемента mark_ranges.
struct Stream
{
Stream(const String & path_prefix, size_t mark_number)
: plain(path_prefix + ".bin", std::min(static_cast<size_t>(DBMS_DEFAULT_BUFFER_SIZE), Poco::File(path_prefix + ".bin").getSize())),
compressed(plain)
Stream(const String & path_prefix, size_t mark_number, UncompressedCache * uncompressed_cache)
{
size_t buf_size = std::min(static_cast<size_t>(DBMS_DEFAULT_BUFFER_SIZE), Poco::File(path_prefix + ".bin").getSize());
size_t offset_in_compressed_file = 0;
size_t offset_in_decompressed_block = 0;
if (mark_number)
{
/// Прочитаем из файла с засечками смещение в файле с данными.
ReadBufferFromFile marks(path_prefix + ".mrk", MERGE_TREE_MARK_SIZE);
marks.seek(mark_number * MERGE_TREE_MARK_SIZE);
size_t offset_in_compressed_file = 0;
size_t offset_in_decompressed_block = 0;
readIntBinary(offset_in_compressed_file, marks);
readIntBinary(offset_in_decompressed_block, marks);
plain.seek(offset_in_compressed_file);
compressed.next();
compressed.position() += offset_in_decompressed_block;
}
if (uncompressed_cache)
{
compressed = new CachedCompressedReadBuffer(
path_prefix + ".bin", offset_in_compressed_file, *uncompressed_cache, buf_size);
}
else
{
plain = new ReadBufferFromFile(path_prefix + ".bin", buf_size);
compressed = new CompressedReadBuffer(*plain);
if (offset_in_compressed_file)
plain->seek(offset_in_compressed_file);
}
compressed->next();
compressed->position() += offset_in_decompressed_block;
}
ReadBufferFromFile plain;
CompressedReadBuffer compressed;
SharedPtr<ReadBufferFromFile> plain; /// Может отсутствовать, если используется CachedCompressedReadBuffer.
SharedPtr<ReadBuffer> compressed; /// Либо CompressedReadBuffer, либо CachedCompressedReadBuffer.
};
typedef std::map<std::string, SharedPtr<Stream> > FileStreams;
@ -320,6 +338,8 @@ private:
*/
if (!Poco::File(path + escaped_column_name + ".bin").exists())
return;
UncompressedCache * uncompressed_cache = use_uncompressed_cache ? storage.context.getUncompressedCache() : NULL;
/// Для массивов используются отдельные потоки для размеров.
if (const DataTypeArray * type_arr = dynamic_cast<const DataTypeArray *>(&type))
@ -330,8 +350,7 @@ private:
+ ARRAY_SIZES_COLUMN_NAME_SUFFIX + toString(level);
streams.insert(std::make_pair(size_name, new Stream(
path + escaped_size_name,
mark_number)));
path + escaped_size_name, mark_number, uncompressed_cache)));
addStream(name, *type_arr->getNestedType(), mark_number, level + 1);
}
@ -341,8 +360,7 @@ private:
String escaped_size_name = escaped_column_name + ARRAY_SIZES_COLUMN_NAME_SUFFIX + toString(level);
streams.insert(std::make_pair(size_name, new Stream(
path + escaped_size_name,
mark_number)));
path + escaped_size_name, mark_number, uncompressed_cache)));
const NamesAndTypesList & columns = *type_nested->getNestedTypesList();
for (NamesAndTypesList::const_iterator it = columns.begin(); it != columns.end(); ++it)
@ -350,8 +368,7 @@ private:
}
else
streams.insert(std::make_pair(name, new Stream(
path + escaped_column_name,
mark_number)));
path + escaped_column_name, mark_number, uncompressed_cache)));
}
void readData(const String & name, const IDataType & type, IColumn & column, size_t max_rows_to_read, size_t level = 0, bool read_offsets = true)
@ -363,7 +380,7 @@ private:
{
type_arr->deserializeOffsets(
column,
streams[DataTypeNested::extractNestedTableName(name) + ARRAY_SIZES_COLUMN_NAME_SUFFIX + toString(level)]->compressed,
*streams[DataTypeNested::extractNestedTableName(name) + ARRAY_SIZES_COLUMN_NAME_SUFFIX + toString(level)]->compressed,
max_rows_to_read);
}
@ -379,7 +396,7 @@ private:
{
type_nested->deserializeOffsets(
column,
streams[name + ARRAY_SIZES_COLUMN_NAME_SUFFIX + toString(level)]->compressed,
*streams[name + ARRAY_SIZES_COLUMN_NAME_SUFFIX + toString(level)]->compressed,
max_rows_to_read);
if (column.size())
@ -399,7 +416,7 @@ private:
}
}
else
type.deserializeBinary(column, streams[name]->compressed, max_rows_to_read);
type.deserializeBinary(column, *streams[name]->compressed, max_rows_to_read);
}
};

View File

@ -302,8 +302,8 @@ private:
static String getPartName(DayNum_t left_date, DayNum_t right_date, UInt64 left_id, UInt64 right_id, UInt64 level);
BlockInputStreams spreadMarkRangesAmongThreads(RangesInDataParts parts, size_t threads, const Names & column_names, size_t max_block_size);
BlockInputStreams spreadMarkRangesAmongThreadsFinal(RangesInDataParts parts, size_t threads, const Names & column_names, size_t max_block_size);
BlockInputStreams spreadMarkRangesAmongThreads(RangesInDataParts parts, size_t threads, const Names & column_names, size_t max_block_size, bool use_uncompressed_cache);
BlockInputStreams spreadMarkRangesAmongThreadsFinal(RangesInDataParts parts, size_t threads, const Names & column_names, size_t max_block_size, bool use_uncompressed_cache);
/// Создать выражение "Sign == 1".
void createPositiveSignCondition(ExpressionActionsPtr & out_expression, String & out_column);

View File

@ -0,0 +1,56 @@
#include <iostream>
#include <iomanip>
#include <DB/IO/CachedCompressedReadBuffer.h>
#include <DB/IO/WriteBufferFromFile.h>
#include <DB/IO/copyData.h>
#include <statdaemons/Stopwatch.h>
int main(int argc, char ** argv)
{
using namespace DB;
try
{
UncompressedCache cache(1024);
std::string path = argv[1];
std::cerr << std::fixed << std::setprecision(3);
size_t hits = 0;
size_t misses = 0;
{
Stopwatch watch;
CachedCompressedReadBuffer in(path, 0, cache);
WriteBufferFromFile out("1.tsv");
copyData(in, out);
std::cerr << "Elapsed: " << watch.elapsedSeconds() << std::endl;
}
cache.getStats(hits, misses);
std::cerr << "Hits: " << hits << ", misses: " << misses << std::endl;
{
Stopwatch watch;
CachedCompressedReadBuffer in(path, 0, cache);
WriteBufferFromFile out("2.tsv");
copyData(in, out);
std::cerr << "Elapsed: " << watch.elapsedSeconds() << std::endl;
}
cache.getStats(hits, misses);
std::cerr << "Hits: " << hits << ", misses: " << misses << std::endl;
}
catch (const Exception & e)
{
std::cerr << e.what() << ", " << e.displayText() << std::endl;
return 1;
}
return 0;
}

View File

@ -388,4 +388,21 @@ ProgressCallback Context::getProgressCallback() const
}
void Context::setUncompressedCache(size_t cache_size_in_cells)
{
Poco::ScopedLock<Poco::Mutex> lock(shared->mutex);
if (shared->uncompressed_cache)
throw Exception("Uncompressed cache has been already created.", ErrorCodes::LOGICAL_ERROR);
shared->uncompressed_cache = new UncompressedCache(cache_size_in_cells);
}
UncompressedCachePtr Context::getUncompressedCache() const
{
/// Исходим из допущения, что функция setUncompressedCache, если вызывалась, то раньше. Иначе поставьте mutex.
return shared->uncompressed_cache;
}
}

View File

@ -31,6 +31,7 @@ void Settings::set(const String & name, const Field & value)
else if (name == "connections_with_failover_max_tries") connections_with_failover_max_tries = safeGet<UInt64>(value);
else if (name == "sign_rewrite") sign_rewrite = safeGet<UInt64>(value);
else if (name == "extremes") extremes = safeGet<UInt64>(value);
else if (name == "use_uncompressed_cache") use_uncompressed_cache = safeGet<UInt64>(value);
else if (name == "profile") setProfile(get<const String &>(value));
else if (!limits.trySet(name, value))
throw Exception("Unknown setting " + name, ErrorCodes::UNKNOWN_SETTING);
@ -53,7 +54,8 @@ void Settings::set(const String & name, ReadBuffer & buf)
|| name == "distributed_connections_pool_size"
|| name == "connections_with_failover_max_tries"
|| name == "sign_rewrite"
|| name == "extremes")
|| name == "extremes"
|| name == "use_uncompressed_cache")
{
UInt64 value = 0;
readVarUInt(value, buf);
@ -86,7 +88,8 @@ void Settings::set(const String & name, const String & value)
|| name == "distributed_connections_pool_size"
|| name == "connections_with_failover_max_tries"
|| name == "sign_rewrite"
|| name == "extremes")
|| name == "extremes"
|| name == "use_uncompressed_cache")
{
set(name, parse<UInt64>(value));
}
@ -146,6 +149,7 @@ void Settings::serialize(WriteBuffer & buf) const
writeStringBinary("connections_with_failover_max_tries", buf); writeVarUInt(connections_with_failover_max_tries, buf);
writeStringBinary("sign_rewrite", buf); writeVarUInt(sign_rewrite, buf);
writeStringBinary("extremes", buf); writeVarUInt(extremes, buf);
writeStringBinary("use_uncompressed_cache", buf); writeVarUInt(use_uncompressed_cache, buf);
limits.serialize(buf);

View File

@ -105,6 +105,11 @@ int Server::main(const std::vector<std::string> & args)
/// Максимальное количество одновременно выполняющихся запросов.
global_context.getProcessList().setMaxSize(config.getInt("max_concurrent_queries", 0));
/// Размер кэша разжатых блоков. Если нулевой - кэш отключён.
size_t uncompressed_cache_size = config.getInt("uncompressed_cache_size", 0);
if (uncompressed_cache_size)
global_context.setUncompressedCache(uncompressed_cache_size);
/// Загружаем настройки.
Settings & settings = global_context.getSettingsRef();
settings.setProfile(config.getString("default_profile", "default"));

View File

@ -269,11 +269,11 @@ BlockInputStreams StorageMergeTree::read(
std::sort(column_names_to_read.begin(), column_names_to_read.end());
column_names_to_read.erase(std::unique(column_names_to_read.begin(), column_names_to_read.end()), column_names_to_read.end());
res = spreadMarkRangesAmongThreadsFinal(parts_with_ranges, threads, column_names_to_read, max_block_size);
res = spreadMarkRangesAmongThreadsFinal(parts_with_ranges, threads, column_names_to_read, max_block_size, settings.use_uncompressed_cache);
}
else
{
res = spreadMarkRangesAmongThreads(parts_with_ranges, threads, column_names_to_read, max_block_size);
res = spreadMarkRangesAmongThreads(parts_with_ranges, threads, column_names_to_read, max_block_size, settings.use_uncompressed_cache);
}
if (select.sample_size)
@ -292,7 +292,8 @@ BlockInputStreams StorageMergeTree::read(
/// Примерно поровну распределить засечки между потоками.
BlockInputStreams StorageMergeTree::spreadMarkRangesAmongThreads(RangesInDataParts parts, size_t threads, const Names & column_names, size_t max_block_size)
BlockInputStreams StorageMergeTree::spreadMarkRangesAmongThreads(
RangesInDataParts parts, size_t threads, const Names & column_names, size_t max_block_size, bool use_uncompressed_cache)
{
/// На всякий случай перемешаем куски.
std::random_shuffle(parts.begin(), parts.end());
@ -347,9 +348,9 @@ BlockInputStreams StorageMergeTree::spreadMarkRangesAmongThreads(RangesInDataPar
/// Восстановим порядок отрезков.
std::reverse(part.ranges.begin(), part.ranges.end());
streams.push_back(new MergeTreeBlockInputStream(full_path + part.data_part->name + '/',
max_block_size, column_names, *this,
part.data_part, part.ranges, thisPtr()));
streams.push_back(new MergeTreeBlockInputStream(
full_path + part.data_part->name + '/', max_block_size, column_names, *this,
part.data_part, part.ranges, thisPtr(), use_uncompressed_cache));
need_marks -= marks_in_part;
parts.pop_back();
sum_marks_in_parts.pop_back();
@ -376,9 +377,9 @@ BlockInputStreams StorageMergeTree::spreadMarkRangesAmongThreads(RangesInDataPar
part.ranges.pop_back();
}
streams.push_back(new MergeTreeBlockInputStream(full_path + part.data_part->name + '/',
max_block_size, column_names, *this,
part.data_part, ranges_to_get_from_part, thisPtr()));
streams.push_back(new MergeTreeBlockInputStream(
full_path + part.data_part->name + '/', max_block_size, column_names, *this,
part.data_part, ranges_to_get_from_part, thisPtr(), use_uncompressed_cache));
}
if (streams.size() == 1)
@ -396,7 +397,8 @@ BlockInputStreams StorageMergeTree::spreadMarkRangesAmongThreads(RangesInDataPar
/// Распределить засечки между потоками и сделать, чтобы в ответе (почти) все данные были сколлапсированы (модификатор FINAL).
BlockInputStreams StorageMergeTree::spreadMarkRangesAmongThreadsFinal(RangesInDataParts parts, size_t threads, const Names & column_names, size_t max_block_size)
BlockInputStreams StorageMergeTree::spreadMarkRangesAmongThreadsFinal(
RangesInDataParts parts, size_t threads, const Names & column_names, size_t max_block_size, bool use_uncompressed_cache)
{
ExpressionActionsPtr sign_filter_expression;
String sign_filter_column;
@ -408,9 +410,9 @@ BlockInputStreams StorageMergeTree::spreadMarkRangesAmongThreadsFinal(RangesInDa
{
RangesInDataPart & part = parts[part_index];
BlockInputStreamPtr source_stream = new MergeTreeBlockInputStream(full_path + part.data_part->name + '/',
max_block_size, column_names, *this,
part.data_part, part.ranges, thisPtr());
BlockInputStreamPtr source_stream = new MergeTreeBlockInputStream(
full_path + part.data_part->name + '/', max_block_size, column_names, *this,
part.data_part, part.ranges, thisPtr(), use_uncompressed_cache);
to_collapse.push_back(new ExpressionBlockInputStream(source_stream, primary_expr));
}
@ -846,7 +848,7 @@ void StorageMergeTree::mergeParts(std::vector<DataPartPtr> parts)
{
MarkRanges ranges(1, MarkRange(0, parts[i]->size));
src_streams.push_back(new ExpressionBlockInputStream(new MergeTreeBlockInputStream(
full_path + parts[i]->name + '/', DEFAULT_BLOCK_SIZE, all_column_names, *this, parts[i], ranges, StoragePtr()), primary_expr));
full_path + parts[i]->name + '/', DEFAULT_BLOCK_SIZE, all_column_names, *this, parts[i], ranges, StoragePtr(), false), primary_expr));
}
/// Порядок потоков важен: при совпадении ключа элементы идут в порядке номера потока-источника.