mirror of
https://github.com/ClickHouse/ClickHouse.git
synced 2024-11-21 23:21:59 +00:00
fix segfault in ttl merge with non-physical columns in block
This commit is contained in:
parent
c3f0116602
commit
c0a63801fc
@ -26,6 +26,7 @@ TTLBlockInputStream::TTLBlockInputStream(
|
|||||||
, date_lut(DateLUT::instance())
|
, date_lut(DateLUT::instance())
|
||||||
{
|
{
|
||||||
children.push_back(input_);
|
children.push_back(input_);
|
||||||
|
header = children.at(0)->getHeader();
|
||||||
|
|
||||||
const auto & column_defaults = storage.getColumns().getDefaults();
|
const auto & column_defaults = storage.getColumns().getDefaults();
|
||||||
ASTPtr default_expr_list = std::make_shared<ASTExpressionList>();
|
ASTPtr default_expr_list = std::make_shared<ASTExpressionList>();
|
||||||
@ -58,11 +59,6 @@ TTLBlockInputStream::TTLBlockInputStream(
|
|||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
Block TTLBlockInputStream::getHeader() const
|
|
||||||
{
|
|
||||||
return children.at(0)->getHeader();
|
|
||||||
}
|
|
||||||
|
|
||||||
Block TTLBlockInputStream::readImpl()
|
Block TTLBlockInputStream::readImpl()
|
||||||
{
|
{
|
||||||
Block block = children.at(0)->read();
|
Block block = children.at(0)->read();
|
||||||
@ -108,11 +104,13 @@ void TTLBlockInputStream::removeRowsWithExpiredTableTTL(Block & block)
|
|||||||
const auto & current = block.getByName(storage.ttl_table_entry.result_column);
|
const auto & current = block.getByName(storage.ttl_table_entry.result_column);
|
||||||
const IColumn * ttl_column = current.column.get();
|
const IColumn * ttl_column = current.column.get();
|
||||||
|
|
||||||
|
const auto & column_names = header.getNames();
|
||||||
MutableColumns result_columns;
|
MutableColumns result_columns;
|
||||||
result_columns.reserve(getHeader().columns());
|
result_columns.reserve(column_names.size());
|
||||||
for (const auto & name : storage.getColumns().getNamesOfPhysical())
|
|
||||||
|
for (auto it = column_names.begin(); it != column_names.end(); ++it)
|
||||||
{
|
{
|
||||||
auto & column_with_type = block.getByName(name);
|
auto & column_with_type = block.getByName(*it);
|
||||||
const IColumn * values_column = column_with_type.column.get();
|
const IColumn * values_column = column_with_type.column.get();
|
||||||
MutableColumnPtr result_column = values_column->cloneEmpty();
|
MutableColumnPtr result_column = values_column->cloneEmpty();
|
||||||
result_column->reserve(block.rows());
|
result_column->reserve(block.rows());
|
||||||
@ -125,13 +123,13 @@ void TTLBlockInputStream::removeRowsWithExpiredTableTTL(Block & block)
|
|||||||
new_ttl_infos.table_ttl.update(cur_ttl);
|
new_ttl_infos.table_ttl.update(cur_ttl);
|
||||||
result_column->insertFrom(*values_column, i);
|
result_column->insertFrom(*values_column, i);
|
||||||
}
|
}
|
||||||
else
|
else if (it == column_names.begin())
|
||||||
++rows_removed;
|
++rows_removed;
|
||||||
}
|
}
|
||||||
result_columns.emplace_back(std::move(result_column));
|
result_columns.emplace_back(std::move(result_column));
|
||||||
}
|
}
|
||||||
|
|
||||||
block = getHeader().cloneWithColumns(std::move(result_columns));
|
block = header.cloneWithColumns(std::move(result_columns));
|
||||||
}
|
}
|
||||||
|
|
||||||
void TTLBlockInputStream::removeValuesWithExpiredColumnTTL(Block & block)
|
void TTLBlockInputStream::removeValuesWithExpiredColumnTTL(Block & block)
|
||||||
|
@ -21,7 +21,7 @@ public:
|
|||||||
|
|
||||||
String getName() const override { return "TTLBlockInputStream"; }
|
String getName() const override { return "TTLBlockInputStream"; }
|
||||||
|
|
||||||
Block getHeader() const override;
|
Block getHeader() const override { return header; };
|
||||||
|
|
||||||
protected:
|
protected:
|
||||||
Block readImpl() override;
|
Block readImpl() override;
|
||||||
@ -47,6 +47,8 @@ private:
|
|||||||
|
|
||||||
std::unordered_map<String, String> defaults_result_column;
|
std::unordered_map<String, String> defaults_result_column;
|
||||||
ExpressionActionsPtr defaults_expression;
|
ExpressionActionsPtr defaults_expression;
|
||||||
|
|
||||||
|
Block header;
|
||||||
private:
|
private:
|
||||||
/// Removes values with expired ttl and computes new min_ttl and empty_columns for part
|
/// Removes values with expired ttl and computes new min_ttl and empty_columns for part
|
||||||
void removeValuesWithExpiredColumnTTL(Block & block);
|
void removeValuesWithExpiredColumnTTL(Block & block);
|
||||||
|
@ -1,5 +1,6 @@
|
|||||||
0 0
|
0 0
|
||||||
0 0
|
0 0
|
||||||
|
5 6
|
||||||
2000-10-10 00:00:00 0
|
2000-10-10 00:00:00 0
|
||||||
2000-10-10 00:00:00 0
|
2000-10-10 00:00:00 0
|
||||||
2000-10-10 00:00:00 0
|
2000-10-10 00:00:00 0
|
||||||
|
@ -9,6 +9,17 @@ select a, b from ttl_00933_1;
|
|||||||
|
|
||||||
drop table if exists ttl_00933_1;
|
drop table if exists ttl_00933_1;
|
||||||
|
|
||||||
|
create table ttl_00933_1 (d DateTime, a Int, b Int) engine = MergeTree order by toDate(d) partition by tuple() ttl d + interval 1 second;
|
||||||
|
insert into ttl_00933_1 values (now(), 1, 2);
|
||||||
|
insert into ttl_00933_1 values (now(), 3, 4);
|
||||||
|
insert into ttl_00933_1 values (now() + 1000, 5, 6);
|
||||||
|
optimize table ttl_00933_1 final; -- check ttl merge for part with both expired and unexpired values
|
||||||
|
select sleep(1.1) format Null; -- wait if very fast merge happen
|
||||||
|
optimize table ttl_00933_1 final;
|
||||||
|
select a, b from ttl_00933_1;
|
||||||
|
|
||||||
|
drop table if exists ttl_00933_1;
|
||||||
|
|
||||||
create table ttl_00933_1 (d DateTime, a Int ttl d + interval 1 DAY) engine = MergeTree order by tuple() partition by toDayOfMonth(d);
|
create table ttl_00933_1 (d DateTime, a Int ttl d + interval 1 DAY) engine = MergeTree order by tuple() partition by toDayOfMonth(d);
|
||||||
insert into ttl_00933_1 values (toDateTime('2000-10-10 00:00:00'), 1);
|
insert into ttl_00933_1 values (toDateTime('2000-10-10 00:00:00'), 1);
|
||||||
insert into ttl_00933_1 values (toDateTime('2000-10-10 00:00:00'), 2);
|
insert into ttl_00933_1 values (toDateTime('2000-10-10 00:00:00'), 2);
|
||||||
|
Loading…
Reference in New Issue
Block a user