Fixed errors in clang (part 3) [#METR-2807].

This commit is contained in:
Alexey Milovidov 2015-01-21 07:17:02 +03:00
parent 27543ea7e5
commit 8a1c22bf41
6 changed files with 116 additions and 105 deletions

View File

@ -31,7 +31,7 @@ public:
}
AddingDefaultBlockOutputStream(BlockOutputStreamPtr output_, NamesAndTypesListPtr required_columns_, const Context & context_)
: AddingDefaultBlockOutputStream{output_, required_columns_, {}, context_}
: AddingDefaultBlockOutputStream{output_, required_columns_, ColumnDefaults{}, context_}
{
}

View File

@ -18,15 +18,7 @@ class DistinctBlockInputStream : public IProfilingBlockInputStream
{
public:
/// Пустой columns_ значит все столбцы.
DistinctBlockInputStream(BlockInputStreamPtr input_, const Limits & limits, size_t limit_, Names columns_)
: columns_names(columns_),
limit(limit_),
max_rows(limits.max_rows_in_distinct),
max_bytes(limits.max_bytes_in_distinct),
overflow_mode(limits.distinct_overflow_mode)
{
children.push_back(input_);
}
DistinctBlockInputStream(BlockInputStreamPtr input_, const Limits & limits, size_t limit_, Names columns_);
String getName() const override { return "DistinctBlockInputStream"; }
@ -38,98 +30,7 @@ public:
}
protected:
Block readImpl() override
{
/// Пока не встретится блок, после фильтрации которого что-нибудь останется, или поток не закончится.
while (1)
{
/// Если уже прочитали достаточно строк - то больше читать не будем.
if (limit && set.size() >= limit)
return Block();
Block block = children[0]->read();
if (!block)
return Block();
size_t rows = block.rows();
size_t columns = columns_names.empty() ? block.columns() : columns_names.size();
ConstColumnPlainPtrs column_ptrs(columns);
for (size_t i = 0; i < columns; ++i)
{
if (columns_names.empty())
column_ptrs[i] = block.getByPosition(i).column;
else
column_ptrs[i] = block.getByName(columns_names[i]).column;
}
/// Будем фильтровать блок, оставляя там только строки, которых мы ещё не видели.
IColumn::Filter filter(rows);
size_t old_set_size = set.size();
for (size_t i = 0; i < rows; ++i)
{
/** Уникальность строк будем отслеживать с помощью множества значений SipHash128.
* Делается несколько допущений.
* 1. Допускается неточная работа в случае коллизий SipHash128.
* 2. Допускается неточная работа, если строковые поля содержат нулевые байты.
* 3. Не поддерживаются массивы.
*
* Для оптимизации, можно добавить другие методы из Set.h.
*/
UInt128 key;
SipHash hash;
for (size_t j = 0; j < columns; ++j)
{
StringRef data = column_ptrs[j]->getDataAtWithTerminatingZero(i);
hash.update(data.data, data.size);
}
hash.get128(key.first, key.second);
/// Если вставилось в множество - строчку оставляем, иначе - удаляем.
filter[i] = set.insert(key).second;
if (limit && set.size() == limit)
{
memset(&filter[i + 1], 0, (rows - (i + 1)) * sizeof(IColumn::Filter::value_type));
break;
}
}
/// Если ни одной новой строки не было в блоке - перейдём к следующему блоку.
if (set.size() == old_set_size)
continue;
if (!checkLimits())
{
if (overflow_mode == OverflowMode::THROW)
throw Exception("DISTINCT-Set size limit exceeded."
" Rows: " + toString(set.size()) +
", limit: " + toString(max_rows) +
". Bytes: " + toString(set.getBufferSizeInBytes()) +
", limit: " + toString(max_bytes) + ".",
ErrorCodes::SET_SIZE_LIMIT_EXCEEDED);
if (overflow_mode == OverflowMode::BREAK)
return Block();
throw Exception("Logical error: unknown overflow mode", ErrorCodes::LOGICAL_ERROR);
}
size_t all_columns = block.columns();
for (size_t i = 0; i < all_columns; ++i)
block.getByPosition(i).column = block.getByPosition(i).column->filter(filter);
return block;
}
}
Block readImpl() override;
private:
bool checkLimits() const

View File

@ -23,7 +23,7 @@ namespace DB
class TableFunctionMerge: public ITableFunction
{
public:
std::string getName() const { return "merge"; }
std::string getName() const override { return "merge"; }
StoragePtr execute(ASTPtr ast_function, Context & context) const override
{

View File

@ -25,7 +25,7 @@ public:
/// Максимальное количество различных шардов и максимальное количество реплик одного шарда
const size_t MAX_ADDRESSES = 200;
std::string getName() const { return "remote"; }
std::string getName() const override { return "remote"; }
StoragePtr execute(ASTPtr ast_function, Context & context) const override
{

View File

@ -0,0 +1,110 @@
#include <DB/DataStreams/DistinctBlockInputStream.h>
namespace DB
{
DistinctBlockInputStream::DistinctBlockInputStream(BlockInputStreamPtr input_, const Limits & limits, size_t limit_, Names columns_)
: columns_names(columns_),
limit(limit_),
max_rows(limits.max_rows_in_distinct),
max_bytes(limits.max_bytes_in_distinct),
overflow_mode(limits.distinct_overflow_mode)
{
children.push_back(input_);
}
Block DistinctBlockInputStream::readImpl()
{
/// Пока не встретится блок, после фильтрации которого что-нибудь останется, или поток не закончится.
while (1)
{
/// Если уже прочитали достаточно строк - то больше читать не будем.
if (limit && set.size() >= limit)
return Block();
Block block = children[0]->read();
if (!block)
return Block();
size_t rows = block.rows();
size_t columns = columns_names.empty() ? block.columns() : columns_names.size();
ConstColumnPlainPtrs column_ptrs(columns);
for (size_t i = 0; i < columns; ++i)
{
if (columns_names.empty())
column_ptrs[i] = block.getByPosition(i).column;
else
column_ptrs[i] = block.getByName(columns_names[i]).column;
}
/// Будем фильтровать блок, оставляя там только строки, которых мы ещё не видели.
IColumn::Filter filter(rows);
size_t old_set_size = set.size();
for (size_t i = 0; i < rows; ++i)
{
/** Уникальность строк будем отслеживать с помощью множества значений SipHash128.
* Делается несколько допущений.
* 1. Допускается неточная работа в случае коллизий SipHash128.
* 2. Допускается неточная работа, если строковые поля содержат нулевые байты.
* 3. Не поддерживаются массивы.
*
* Для оптимизации, можно добавить другие методы из Set.h.
*/
UInt128 key;
SipHash hash;
for (size_t j = 0; j < columns; ++j)
{
StringRef data = column_ptrs[j]->getDataAtWithTerminatingZero(i);
hash.update(data.data, data.size);
}
hash.get128(key.first, key.second);
/// Если вставилось в множество - строчку оставляем, иначе - удаляем.
filter[i] = set.insert(key).second;
if (limit && set.size() == limit)
{
memset(&filter[i + 1], 0, (rows - (i + 1)) * sizeof(IColumn::Filter::value_type));
break;
}
}
/// Если ни одной новой строки не было в блоке - перейдём к следующему блоку.
if (set.size() == old_set_size)
continue;
if (!checkLimits())
{
if (overflow_mode == OverflowMode::THROW)
throw Exception("DISTINCT-Set size limit exceeded."
" Rows: " + toString(set.size()) +
", limit: " + toString(max_rows) +
". Bytes: " + toString(set.getBufferSizeInBytes()) +
", limit: " + toString(max_bytes) + ".",
ErrorCodes::SET_SIZE_LIMIT_EXCEEDED);
if (overflow_mode == OverflowMode::BREAK)
return Block();
throw Exception("Logical error: unknown overflow mode", ErrorCodes::LOGICAL_ERROR);
}
size_t all_columns = block.columns();
for (size_t i = 0; i < all_columns; ++i)
block.getByPosition(i).column = block.getByPosition(i).column->filter(filter);
return block;
}
}
}

View File

@ -325,7 +325,7 @@ InterpreterCreateQuery::ColumnsAndDefaults InterpreterCreateQuery::parseColumns(
const auto & tmp_column = block.getByName(col_decl_ptr->name + "_tmp");
/// type mismatch between explicitly specified and deduced type, add conversion
if (typeid(*name_and_type_ptr->type) != typeid(*tmp_column.type))
if (name_and_type_ptr->type->getName() != tmp_column.type->getName())
{
col_decl_ptr->default_expression = makeASTFunction(
"to" + name_and_type_ptr->type->getName(),