dbms: lowered default block size; introduced default insert block size [#METR-11014]..

This commit is contained in:
Alexey Milovidov 2014-04-30 00:22:57 +04:00
parent e1eac9c927
commit fc8aa282c0
8 changed files with 30 additions and 13 deletions

View File

@ -20,9 +20,21 @@
#define DEFAULT_MIN_COMPRESS_BLOCK_SIZE 65536
#define DEFAULT_MAX_COMPRESS_BLOCK_SIZE 1048576
/// Какими блоками по-умолчанию читаются и пишутся данные (в числе строк).
#define DEFAULT_BLOCK_SIZE 1048576
/// То же самое, но для операций слияния. Меньше DEFAULT_BLOCK_SIZE для экономии оперативки (так как читаются все столбцы).
/** Какими блоками по-умолчанию читаются данные (в числе строк).
* Меньшие значения дают лучшую кэш-локальность, меньшее потребление оперативки, но больший оверхед на обработку запроса.
*/
#define DEFAULT_BLOCK_SIZE 65536
/** Какие блоки следует формировать для вставки в таблицу, если мы управляем формированием блоков.
* (Иногда в таблицу вставляются ровно такие блоки, какие были считаны / переданы извне, и на их размер этот параметр не влияет.)
* Больше, чем DEFAULT_BLOCK_SIZE, так как в некоторых таблицах на каждый блок создаётся кусок данных на диске (довольно большая штука),
* и если бы куски были маленькими, то их было бы накладно потом объединять.
*/
#define DEFAULT_INSERT_BLOCK_SIZE 1048576
/** То же самое, но для операций слияния. Меньше DEFAULT_BLOCK_SIZE для экономии оперативки (так как читаются все столбцы).
* Сильно меньше, так как бывают 10-way слияния.
*/
#define DEFAULT_MERGE_BLOCK_SIZE 8192
#define DEFAULT_MAX_QUERY_SIZE 65536

View File

@ -18,7 +18,7 @@ public:
BlockInputStreamFromRowInputStream(
RowInputStreamPtr row_input_,
const Block & sample_,
size_t max_block_size_ = DEFAULT_BLOCK_SIZE);
size_t max_block_size_ = DEFAULT_INSERT_BLOCK_SIZE); /// Обычно дамп читается в целях вставки в таблицу.
void readPrefix() { row_input->readPrefix(); }
void readSuffix() { row_input->readSuffix(); }

View File

@ -33,6 +33,8 @@ struct Settings
M(SettingUInt64, max_compress_block_size, DEFAULT_MAX_COMPRESS_BLOCK_SIZE) \
/** Максимальный размер блока для чтения */ \
M(SettingUInt64, max_block_size, DEFAULT_BLOCK_SIZE) \
/** Максимальный размер блока для вставки, если мы управляем формированием блоков для вставки. */ \
M(SettingUInt64, max_insert_block_size, DEFAULT_INSERT_BLOCK_SIZE) \
/** Максимальное количество потоков выполнения запроса */ \
M(SettingUInt64, max_threads, DEFAULT_MAX_THREADS) \
/** Максимальное количество соединений при распределённой обработке одного запроса (должно быть больше, чем max_threads). */ \

View File

@ -221,7 +221,7 @@ private:
format_max_block_size = config().getInt("format_max_block_size", DEFAULT_BLOCK_SIZE);
insert_format = "Values";
insert_format_max_block_size = config().getInt("insert_format_max_block_size", format_max_block_size);
insert_format_max_block_size = config().getInt("insert_format_max_block_size", DEFAULT_INSERT_BLOCK_SIZE);
connect();

View File

@ -68,11 +68,13 @@ void InterpreterInsertQuery::execute(ReadBuffer * remaining_data_istr)
auto table_lock = table->lockStructure(true);
BlockInputStreamPtr in;
NamesAndTypesListPtr required_columns = new NamesAndTypesList (table->getSampleBlock().getColumnsList());
/// Надо убедиться, что запрос идет в таблицу, которая поддерживает вставку.
table->write(query_ptr);
/// Создаем кортеж из нескольких стримов, в которые будем писать данные.
NamesAndTypesListPtr required_columns = new NamesAndTypesList(table->getSampleBlock().getColumnsList());
/// Надо убедиться, что запрос идет в таблицу, которая поддерживает вставку.
/// TODO Плохо - исправить.
table->write(query_ptr);
/// Создаем кортеж из нескольких стримов, в которые будем писать данные.
BlockOutputStreamPtr out = new AddingDefaultBlockOutputStream(new PushingToViewsBlockOutputStream(query.database, query.table, context, query_ptr), required_columns);
/// Какой тип запроса: INSERT VALUES | INSERT FORMAT | INSERT SELECT?
@ -98,7 +100,7 @@ void InterpreterInsertQuery::execute(ReadBuffer * remaining_data_istr)
ConcatReadBuffer istr(buffers);
Block sample = table->getSampleBlock();
in = context.getFormatFactory().getInput(format, istr, sample, context.getSettings().max_block_size, context.getDataTypeFactory());
in = context.getFormatFactory().getInput(format, istr, sample, context.getSettings().max_insert_block_size, context.getDataTypeFactory());
copyData(*in, *out);
}
else
@ -122,7 +124,9 @@ BlockIO InterpreterInsertQuery::execute()
NamesAndTypesListPtr required_columns = new NamesAndTypesList(table->getSampleBlock().getColumnsList());
/// Надо убедиться, что запрос идет в таблицу, которая поддерживает вставку.
/// TODO Плохо - исправить.
table->write(query_ptr);
/// Создаем кортеж из нескольких стримов, в которые будем писать данные.
BlockOutputStreamPtr out = new AddingDefaultBlockOutputStream(new PushingToViewsBlockOutputStream(query.database, query.table, context, query_ptr), required_columns);

View File

@ -11,7 +11,7 @@ class InterserverIOHTTPHandler : public Poco::Net::HTTPRequestHandler
public:
InterserverIOHTTPHandler(Server & server_)
: server(server_)
, log(&Logger::get("InterserverIOHTTPHandler "))
, log(&Logger::get("InterserverIOHTTPHandler"))
{
}

View File

@ -598,7 +598,7 @@ void TCPHandler::initBlockInput()
"Native",
*state.maybe_compressed_in,
state.io.out_sample,
query_context.getSettingsRef().max_block_size,
query_context.getSettingsRef().max_insert_block_size, /// Реально не используется в формате Native.
query_context.getDataTypeFactory());
}
}

View File

@ -11,7 +11,6 @@
#include <DB/Parsers/formatAST.h>
#include <DB/Interpreters/executeQuery.h>
#include <DB/Interpreters/InterpreterDropQuery.h>
#include <DB/DataStreams/copyData.h>
#include <DB/DataStreams/ConcatBlockInputStream.h>
#include <DB/DataStreams/narrowBlockInputStreams.h>
#include <DB/DataStreams/AddingDefaultBlockInputStream.h>