This commit is contained in:
Evgeniy Gatov 2014-06-25 17:07:03 +04:00
commit 63848e805c
8 changed files with 88 additions and 100 deletions

View File

@ -25,17 +25,15 @@ public:
typedef std::list<ColumnWithNameAndType> Container_t;
typedef std::vector<Container_t::iterator> IndexByPosition_t;
typedef std::map<String, Container_t::iterator> IndexByName_t;
private:
Container_t data;
IndexByPosition_t index_by_position;
IndexByName_t index_by_name;
void rebuildIndexByPosition();
public:
Block() {}
/// нужны, чтобы правильно скопировались индексы
Block(const Block & other);
Block(Block && other) = default;
@ -79,14 +77,14 @@ public:
/** То же самое, но без проверки - берёт количество строк из первого столбца, если он есть или возвращает 0.
*/
size_t rowsInFirstColumn() const;
size_t columns() const;
size_t columns() const { return index_by_position.size(); }
/// Приблизительное количество байт в оперативке - для профайлинга.
size_t bytes() const;
operator bool() const { return !data.empty(); }
bool operator!() const { return data.empty(); }
operator bool() const { return !index_by_position.empty(); }
bool operator!() const { return index_by_position.empty(); }
/** Получить список имён столбцов через запятую. */
std::string dumpNames() const;
@ -96,7 +94,7 @@ public:
/** Получить такой же блок, но пустой. */
Block cloneEmpty() const;
/** Заменяет столбцы смещений внутри вложенных таблиц на один общий для таблицы.
* Кидает исключение, если эти смещения вдруг оказались неодинаковы.
*/

View File

@ -13,6 +13,9 @@
#define DBMS_DEFAULT_RECEIVE_TIMEOUT_SEC 300
#define DBMS_DEFAULT_POLL_INTERVAL 10
/// Насколько секунд можно максимально задерживать вставку в таблицу типа MergeTree, если в ней много недомердженных кусков.
#define DBMS_MAX_DELAY_OF_INSERT 200.0
/// При записи данных, для сжатия выделяется буфер размером max_compress_block_size. При переполнении буфера или если в буфер
/// записано данных больше или равно, чем min_compress_block_size, то при очередной засечке, данные так же будут сжиматься
/// В результате, для маленьких столбцов (числа 1-8 байт), при index_granularity = 8192, размер блока будет 64 KБ.

View File

@ -164,7 +164,6 @@ class ExpressionActions
public:
typedef std::vector<ExpressionAction> Actions;
ExpressionActions(const NamesAndTypesList & input_columns_, const Settings & settings_)
: input_columns(input_columns_), settings(settings_)
{

View File

@ -194,20 +194,20 @@ private:
void getArrayJoinedColumns();
void getArrayJoinedColumnsImpl(ASTPtr ast);
void addMultipleArrayJoinAction(ExpressionActions & actions);
void addMultipleArrayJoinAction(ExpressionActionsPtr & actions);
void addJoinAction(ExpressionActions & actions, bool only_types);
void addJoinAction(ExpressionActionsPtr & actions, bool only_types);
struct ScopeStack;
void getActionsImpl(ASTPtr ast, bool no_subqueries, bool only_consts, ScopeStack & actions_stack);
void getRootActionsImpl(ASTPtr ast, bool no_subqueries, bool only_consts, ExpressionActions & actions);
void getRootActionsImpl(ASTPtr ast, bool no_subqueries, bool only_consts, ExpressionActionsPtr & actions);
void getActionsBeforeAggregationImpl(ASTPtr ast, ExpressionActions & actions, bool no_subqueries);
void getActionsBeforeAggregationImpl(ASTPtr ast, ExpressionActionsPtr & actions, bool no_subqueries);
/// Добавить агрегатные функции в aggregate_descriptions.
/// Установить has_aggregation = true, если есть хоть одна агрегатная функция.
void getAggregatesImpl(ASTPtr ast, ExpressionActions & actions);
void getAggregatesImpl(ASTPtr ast, ExpressionActionsPtr & actions);
/** Получить множество нужных столбцов для чтения из таблицы.
* При этом, столбцы, указанные в ignored_names, считаются ненужными. И параметр ignored_names может модифицироваться.

View File

@ -38,28 +38,19 @@ void Block::addDefaults(NamesAndTypesListPtr required_columns)
Block & Block::operator= (const Block & other)
{
data = other.data;
rebuildIndexByPosition();
index_by_name.clear();
for (IndexByName_t::const_iterator it = other.index_by_name.begin(); it != other.index_by_name.end(); ++it)
{
Container_t::iterator value = data.begin();
std::advance(value, std::distance(const_cast<Block&>(other).data.begin(), it->second));
index_by_name[it->first] = value;
}
return *this;
}
void Block::rebuildIndexByPosition()
{
index_by_position.resize(data.size());
index_by_name.clear();
size_t pos = 0;
for (Container_t::iterator it = data.begin(); it != data.end(); ++it, ++pos)
{
index_by_position[pos] = it;
}
index_by_name[it->name] = it;
}
return *this;
}
void Block::insert(size_t position, const ColumnWithNameAndType & elem)
{
@ -72,7 +63,7 @@ void Block::insert(size_t position, const ColumnWithNameAndType & elem)
insert(elem);
return;
}
Container_t::iterator it = data.insert(index_by_position[position], elem);
index_by_name[elem.name] = it;
@ -104,7 +95,7 @@ void Block::erase(size_t position)
if (position >= index_by_position.size())
throw Exception("Position out of bound in Block::erase(), max position = "
+ toString(index_by_position.size()), ErrorCodes::POSITION_OUT_OF_BOUND);
Container_t::iterator it = index_by_position[position];
index_by_name.erase(index_by_name.find(it->name));
data.erase(it);
@ -122,12 +113,12 @@ void Block::erase(const String & name)
if (index_it == index_by_name.end())
throw Exception("No such name in Block::erase(): '"
+ name + "'", ErrorCodes::NOT_FOUND_COLUMN_IN_BLOCK);
Container_t::iterator it = index_it->second;
index_by_name.erase(index_it);
size_t position = std::distance(data.begin(), it);
data.erase(it);
for (size_t i = position, size = index_by_position.size() - 1; i < size; ++i)
index_by_position[i] = index_by_position[i + 1];
@ -142,7 +133,7 @@ ColumnWithNameAndType & Block::getByPosition(size_t position)
+ " is out of bound in Block::getByPosition(), max position = "
+ toString(index_by_position.size() - 1)
+ ", there are columns: " + dumpNames(), ErrorCodes::POSITION_OUT_OF_BOUND);
return *index_by_position[position];
}
@ -154,7 +145,7 @@ const ColumnWithNameAndType & Block::getByPosition(size_t position) const
+ " is out of bound in Block::getByPosition(), max position = "
+ toString(index_by_position.size() - 1)
+ ", there are columns: " + dumpNames(), ErrorCodes::POSITION_OUT_OF_BOUND);
return *index_by_position[position];
}
@ -227,12 +218,6 @@ size_t Block::rowsInFirstColumn() const
}
size_t Block::columns() const
{
return data.size();
}
size_t Block::bytes() const
{
size_t res = 0;
@ -302,15 +287,15 @@ void Block::checkNestedArraysOffsets() const
/// Указатели на столбцы-массивы, для проверки равенства столбцов смещений во вложенных структурах данных
typedef std::map<String, const ColumnArray *> ArrayColumns;
ArrayColumns array_columns;
for (Container_t::const_iterator it = data.begin(); it != data.end(); ++it)
{
const ColumnWithNameAndType & column = *it;
if (const ColumnArray * column_array = dynamic_cast<const ColumnArray *>(&*column.column))
{
String name = DataTypeNested::extractNestedTableName(column.name);
ArrayColumns::const_iterator it = array_columns.find(name);
if (array_columns.end() == it)
array_columns[name] = column_array;
@ -329,15 +314,15 @@ void Block::optimizeNestedArraysOffsets()
/// Указатели на столбцы-массивы, для проверки равенства столбцов смещений во вложенных структурах данных
typedef std::map<String, ColumnArray *> ArrayColumns;
ArrayColumns array_columns;
for (Container_t::iterator it = data.begin(); it != data.end(); ++it)
{
ColumnWithNameAndType & column = *it;
if (ColumnArray * column_array = dynamic_cast<ColumnArray *>(&*column.column))
{
String name = DataTypeNested::extractNestedTableName(column.name);
ArrayColumns::const_iterator it = array_columns.find(name);
if (array_columns.end() == it)
array_columns[name] = column_array;
@ -345,7 +330,7 @@ void Block::optimizeNestedArraysOffsets()
{
if (!it->second->hasEqualOffsets(*column_array))
throw Exception("Sizes of nested arrays do not match", ErrorCodes::SIZES_OF_ARRAYS_DOESNT_MATCH);
/// делаем так, чтобы столбцы смещений массивов внутри одной вложенной таблицы указывали в одно место
column_array->getOffsetsColumn() = it->second->getOffsetsColumn();
}

View File

@ -55,16 +55,9 @@ void NativeBlockOutputStream::write(const Block & block)
writeVarUInt(columns, ostr);
writeVarUInt(rows, ostr);
/** Если есть столбцы-константы - то материализуем их.
* (Так как тип данных не умеет сериализовывать/десериализовывать константы.)
*/
Block materialized_block = block;
for (size_t i = 0; i < columns; ++i)
{
ColumnWithNameAndType & column = materialized_block.getByPosition(i);
if (column.column->isConst())
column.column = dynamic_cast<const IColumnConst &>(*column.column).convertToFullColumn();
const ColumnWithNameAndType & column = block.getByPosition(i);
/// Имя
writeStringBinary(column.name, ostr);
@ -73,7 +66,15 @@ void NativeBlockOutputStream::write(const Block & block)
writeStringBinary(column.type->getName(), ostr);
/// Данные
writeData(*column.type, *column.column, ostr);
/** Если есть столбцы-константы - то материализуем их.
* (Так как тип данных не умеет сериализовывать/десериализовывать константы.)
*/
ColumnPtr col = column.column->isConst()
? static_cast<const IColumnConst &>(*column.column).convertToFullColumn()
: column.column;
writeData(*column.type, *col, ostr);
}
}

View File

@ -93,7 +93,7 @@ void ExpressionAnalyzer::init()
if (select_query && (select_query->group_expression_list || select_query->having_expression))
has_aggregation = true;
ExpressionActions temp_actions(columns, settings);
ExpressionActionsPtr temp_actions = new ExpressionActions(columns, settings);
if (select_query && select_query->array_join_expression_list)
{
@ -123,7 +123,7 @@ void ExpressionAnalyzer::init()
getRootActionsImpl(group_asts[i], true, false, temp_actions);
NameAndTypePair key;
key.first = group_asts[i]->getColumnName();
key.second = temp_actions.getSampleBlock().getByName(key.first).type;
key.second = temp_actions->getSampleBlock().getByName(key.first).type;
aggregation_keys.push_back(key);
if (!unique_keys.count(key.first))
@ -142,7 +142,7 @@ void ExpressionAnalyzer::init()
}
else
{
aggregated_columns = temp_actions.getSampleBlock().getColumnsList();
aggregated_columns = temp_actions->getSampleBlock().getColumnsList();
}
}
@ -697,14 +697,15 @@ struct ExpressionAnalyzer::ScopeStack
Levels stack;
Settings settings;
ScopeStack(const ExpressionActions & actions, const Settings & settings_)
ScopeStack(const ExpressionActionsPtr & actions, const Settings & settings_)
: settings(settings_)
{
stack.push_back(Level());
stack.back().actions = new ExpressionActions(actions);
const NamesAndTypesList & input_columns = actions.getSampleBlock().getColumnsList();
for (NamesAndTypesList::const_iterator it = input_columns.begin(); it != input_columns.end(); ++it)
stack.back().new_columns.insert(it->first);
stack.back().actions = actions;
const Block & sample_block = actions->getSampleBlock();
for (size_t i = 0, size = sample_block.columns(); i < size; ++i)
stack.back().new_columns.insert(sample_block.unsafeGetByPosition(i).name);
}
void pushLevel(const NamesAndTypesList & input_columns)
@ -712,8 +713,6 @@ struct ExpressionAnalyzer::ScopeStack
stack.push_back(Level());
Level & prev = stack[stack.size() - 2];
ColumnsWithNameAndType prev_columns = prev.actions->getSampleBlock().getColumns();
ColumnsWithNameAndType all_columns;
NameSet new_names;
@ -724,10 +723,12 @@ struct ExpressionAnalyzer::ScopeStack
stack.back().new_columns.insert(it->first);
}
for (ColumnsWithNameAndType::const_iterator it = prev_columns.begin(); it != prev_columns.end(); ++it)
const Block & prev_sample_block = prev.actions->getSampleBlock();
for (size_t i = 0, size = prev_sample_block.columns(); i < size; ++i)
{
if (!new_names.count(it->name))
all_columns.push_back(*it);
const ColumnWithNameAndType & col = prev_sample_block.unsafeGetByPosition(i);
if (!new_names.count(col.name))
all_columns.push_back(col);
}
stack.back().actions = new ExpressionActions(all_columns, settings);
@ -778,11 +779,11 @@ struct ExpressionAnalyzer::ScopeStack
};
void ExpressionAnalyzer::getRootActionsImpl(ASTPtr ast, bool no_subqueries, bool only_consts, ExpressionActions & actions)
void ExpressionAnalyzer::getRootActionsImpl(ASTPtr ast, bool no_subqueries, bool only_consts, ExpressionActionsPtr & actions)
{
ScopeStack scopes(actions, settings);
getActionsImpl(ast, no_subqueries, only_consts, scopes);
actions = *scopes.popLevel();
actions = scopes.popLevel();
}
@ -1124,7 +1125,7 @@ void ExpressionAnalyzer::getActionsImpl(ASTPtr ast, bool no_subqueries, bool onl
}
void ExpressionAnalyzer::getAggregatesImpl(ASTPtr ast, ExpressionActions & actions)
void ExpressionAnalyzer::getAggregatesImpl(ASTPtr ast, ExpressionActionsPtr & actions)
{
ASTFunction * node = dynamic_cast<ASTFunction *>(&*ast);
if (node && node->kind == ASTFunction::AGGREGATE_FUNCTION)
@ -1145,7 +1146,7 @@ void ExpressionAnalyzer::getAggregatesImpl(ASTPtr ast, ExpressionActions & actio
{
getRootActionsImpl(arguments[i], true, false, actions);
const std::string & name = arguments[i]->getColumnName();
types[i] = actions.getSampleBlock().getByName(name).type;
types[i] = actions->getSampleBlock().getByName(name).type;
aggregate.argument_names[i] = name;
}
@ -1205,17 +1206,17 @@ void ExpressionAnalyzer::initChain(ExpressionActionsChain & chain, NamesAndTypes
}
}
void ExpressionAnalyzer::addMultipleArrayJoinAction(ExpressionActions & actions)
void ExpressionAnalyzer::addMultipleArrayJoinAction(ExpressionActionsPtr & actions)
{
NameSet result_columns;
for (NameToNameMap::iterator it = array_join_result_to_source.begin(); it != array_join_result_to_source.end(); ++it)
{
if (it->first != it->second)
actions.add(ExpressionAction::copyColumn(it->second, it->first));
actions->add(ExpressionAction::copyColumn(it->second, it->first));
result_columns.insert(it->first);
}
actions.add(ExpressionAction::arrayJoin(result_columns));
actions->add(ExpressionAction::arrayJoin(result_columns));
}
bool ExpressionAnalyzer::appendArrayJoin(ExpressionActionsChain & chain, bool only_types)
@ -1228,16 +1229,16 @@ bool ExpressionAnalyzer::appendArrayJoin(ExpressionActionsChain & chain, bool on
initChain(chain, columns);
ExpressionActionsChain::Step & step = chain.steps.back();
getRootActionsImpl(select_query->array_join_expression_list, only_types, false, *step.actions);
getRootActionsImpl(select_query->array_join_expression_list, only_types, false, step.actions);
addMultipleArrayJoinAction(*step.actions);
addMultipleArrayJoinAction(step.actions);
return true;
}
void ExpressionAnalyzer::addJoinAction(ExpressionActions & actions, bool only_types)
void ExpressionAnalyzer::addJoinAction(ExpressionActionsPtr & actions, bool only_types)
{
actions.add(ExpressionAction::ordinaryJoin(only_types ? nullptr : joins[0], columns_added_by_join));
actions->add(ExpressionAction::ordinaryJoin(only_types ? nullptr : joins[0], columns_added_by_join));
}
bool ExpressionAnalyzer::appendJoin(ExpressionActionsChain & chain, bool only_types)
@ -1251,7 +1252,7 @@ bool ExpressionAnalyzer::appendJoin(ExpressionActionsChain & chain, bool only_ty
ExpressionActionsChain::Step & step = chain.steps.back();
ASTJoin & ast_join = dynamic_cast<ASTJoin &>(*select_query->join);
getRootActionsImpl(ast_join.using_expr_list, only_types, false, *step.actions);
getRootActionsImpl(ast_join.using_expr_list, only_types, false, step.actions);
{
Names join_key_names(join_key_names_set.begin(), join_key_names_set.end());
@ -1285,7 +1286,7 @@ bool ExpressionAnalyzer::appendJoin(ExpressionActionsChain & chain, bool only_ty
joins.push_back(join);
}
addJoinAction(*step.actions, false);
addJoinAction(step.actions, false);
return true;
}
@ -1301,7 +1302,7 @@ bool ExpressionAnalyzer::appendWhere(ExpressionActionsChain & chain, bool only_t
ExpressionActionsChain::Step & step = chain.steps.back();
step.required_output.push_back(select_query->where_expression->getColumnName());
getRootActionsImpl(select_query->where_expression, only_types, false, *step.actions);
getRootActionsImpl(select_query->where_expression, only_types, false, step.actions);
return true;
}
@ -1320,7 +1321,7 @@ bool ExpressionAnalyzer::appendGroupBy(ExpressionActionsChain & chain, bool only
for (size_t i = 0; i < asts.size(); ++i)
{
step.required_output.push_back(asts[i]->getColumnName());
getRootActionsImpl(asts[i], only_types, false, *step.actions);
getRootActionsImpl(asts[i], only_types, false, step.actions);
}
return true;
@ -1341,13 +1342,13 @@ void ExpressionAnalyzer::appendAggregateFunctionsArguments(ExpressionActionsChai
}
}
getActionsBeforeAggregationImpl(select_query->select_expression_list, *step.actions, only_types);
getActionsBeforeAggregationImpl(select_query->select_expression_list, step.actions, only_types);
if (select_query->having_expression)
getActionsBeforeAggregationImpl(select_query->having_expression, *step.actions, only_types);
getActionsBeforeAggregationImpl(select_query->having_expression, step.actions, only_types);
if (select_query->order_expression_list)
getActionsBeforeAggregationImpl(select_query->order_expression_list, *step.actions, only_types);
getActionsBeforeAggregationImpl(select_query->order_expression_list, step.actions, only_types);
}
bool ExpressionAnalyzer::appendHaving(ExpressionActionsChain & chain, bool only_types)
@ -1361,7 +1362,7 @@ bool ExpressionAnalyzer::appendHaving(ExpressionActionsChain & chain, bool only_
ExpressionActionsChain::Step & step = chain.steps.back();
step.required_output.push_back(select_query->having_expression->getColumnName());
getRootActionsImpl(select_query->having_expression, only_types, false, *step.actions);
getRootActionsImpl(select_query->having_expression, only_types, false, step.actions);
return true;
}
@ -1373,7 +1374,7 @@ void ExpressionAnalyzer::appendSelect(ExpressionActionsChain & chain, bool only_
initChain(chain, aggregated_columns);
ExpressionActionsChain::Step & step = chain.steps.back();
getRootActionsImpl(select_query->select_expression_list, only_types, false, *step.actions);
getRootActionsImpl(select_query->select_expression_list, only_types, false, step.actions);
ASTs asts = select_query->select_expression_list->children;
for (size_t i = 0; i < asts.size(); ++i)
@ -1392,7 +1393,7 @@ bool ExpressionAnalyzer::appendOrderBy(ExpressionActionsChain & chain, bool only
initChain(chain, aggregated_columns);
ExpressionActionsChain::Step & step = chain.steps.back();
getRootActionsImpl(select_query->order_expression_list, only_types, false, *step.actions);
getRootActionsImpl(select_query->order_expression_list, only_types, false, step.actions);
ASTs asts = select_query->order_expression_list->children;
for (size_t i = 0; i < asts.size(); ++i)
@ -1445,7 +1446,7 @@ Block ExpressionAnalyzer::getSelectSampleBlock()
{
assertSelect();
ExpressionActions temp_actions(aggregated_columns, settings);
ExpressionActionsPtr temp_actions = new ExpressionActions(aggregated_columns, settings);
NamesWithAliases result_columns;
ASTs asts = select_query->select_expression_list->children;
@ -1455,12 +1456,12 @@ Block ExpressionAnalyzer::getSelectSampleBlock()
getRootActionsImpl(asts[i], true, false, temp_actions);
}
temp_actions.add(ExpressionAction::project(result_columns));
temp_actions->add(ExpressionAction::project(result_columns));
return temp_actions.getSampleBlock();
return temp_actions->getSampleBlock();
}
void ExpressionAnalyzer::getActionsBeforeAggregationImpl(ASTPtr ast, ExpressionActions & actions, bool no_subqueries)
void ExpressionAnalyzer::getActionsBeforeAggregationImpl(ASTPtr ast, ExpressionActionsPtr & actions, bool no_subqueries)
{
ASTFunction * node = dynamic_cast<ASTFunction *>(&*ast);
if (node && node->kind == ASTFunction::AGGREGATE_FUNCTION)
@ -1505,7 +1506,7 @@ ExpressionActionsPtr ExpressionAnalyzer::getActions(bool project_result)
alias = name;
result_columns.push_back(NameWithAlias(name, alias));
result_names.push_back(alias);
getRootActionsImpl(asts[i], false, false, *actions);
getRootActionsImpl(asts[i], false, false, actions);
}
if (project_result)
@ -1529,7 +1530,7 @@ ExpressionActionsPtr ExpressionAnalyzer::getConstActions()
{
ExpressionActionsPtr actions = new ExpressionActions(NamesAndTypesList(), settings);
getRootActionsImpl(ast, true, true, *actions);
getRootActionsImpl(ast, true, true, actions);
return actions;
}

View File

@ -725,7 +725,8 @@ void MergeTreeData::delayInsertIfNeeded()
{
double delay = std::pow(settings.insert_delay_step, parts_count - settings.parts_to_delay_insert);
delay /= 1000;
delay = std::min(delay, 5 * 60.); /// Ограничим задержку 5 минутами.
delay = std::min(delay, DBMS_MAX_DELAY_OF_INSERT);
LOG_INFO(log, "Delaying inserting block by "
<< std::fixed << std::setprecision(4) << delay << "s because there are " << parts_count << " parts");
std::this_thread::sleep_for(std::chrono::duration<double>(delay));