Merge pull request #15814 from ClickHouse/cleanups

Cleanups
This commit is contained in:
alexey-milovidov 2020-10-11 01:11:03 +03:00 committed by GitHub
commit 717c48cbf3
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
36 changed files with 6 additions and 407 deletions

View File

@ -1,192 +0,0 @@
#include <Columns/ColumnsNumber.h>
#include <Columns/ColumnsCommon.h>
#include <Columns/ColumnConst.h>
#include <Columns/FilterDescription.h>
#include <Interpreters/ExpressionActions.h>
#include <Common/typeid_cast.h>
#include <DataStreams/FilterBlockInputStream.h>
namespace DB
{
namespace ErrorCodes
{
}
FilterBlockInputStream::FilterBlockInputStream(const BlockInputStreamPtr & input, ExpressionActionsPtr expression_,
String filter_column_name_, bool remove_filter_)
: remove_filter(remove_filter_)
, expression(std::move(expression_))
, filter_column_name(std::move(filter_column_name_))
{
children.push_back(input);
/// Determine position of filter column.
header = input->getHeader();
expression->execute(header);
filter_column = header.getPositionByName(filter_column_name);
auto & column_elem = header.safeGetByPosition(filter_column);
/// Isn't the filter already constant?
if (column_elem.column)
constant_filter_description = ConstantFilterDescription(*column_elem.column);
if (!constant_filter_description.always_false
&& !constant_filter_description.always_true)
{
/// Replace the filter column to a constant with value 1.
FilterDescription filter_description_check(*column_elem.column);
column_elem.column = column_elem.type->createColumnConst(header.rows(), 1u);
}
if (remove_filter)
header.erase(filter_column_name);
}
String FilterBlockInputStream::getName() const { return "Filter"; }
Block FilterBlockInputStream::getTotals()
{
totals = children.back()->getTotals();
expression->execute(totals);
return totals;
}
Block FilterBlockInputStream::getHeader() const
{
return header;
}
Block FilterBlockInputStream::readImpl()
{
Block res;
if (constant_filter_description.always_false)
return removeFilterIfNeed(std::move(res));
if (expression->checkColumnIsAlwaysFalse(filter_column_name))
return {};
/// Until non-empty block after filtering or end of stream.
while (true)
{
res = children.back()->read();
if (!res)
return res;
expression->execute(res);
if (constant_filter_description.always_true)
return removeFilterIfNeed(std::move(res));
size_t columns = res.columns();
ColumnPtr column = res.safeGetByPosition(filter_column).column;
/** It happens that at the stage of analysis of expressions (in sample_block) the columns-constants have not been calculated yet,
* and now - are calculated. That is, not all cases are covered by the code above.
* This happens if the function returns a constant for a non-constant argument.
* For example, `ignore` function.
*/
constant_filter_description = ConstantFilterDescription(*column);
if (constant_filter_description.always_false)
{
res.clear();
return res;
}
if (constant_filter_description.always_true)
return removeFilterIfNeed(std::move(res));
FilterDescription filter_and_holder(*column);
/** Let's find out how many rows will be in result.
* To do this, we filter out the first non-constant column
* or calculate number of set bytes in the filter.
*/
size_t first_non_constant_column = 0;
for (size_t i = 0; i < columns; ++i)
{
if (!isColumnConst(*res.safeGetByPosition(i).column))
{
first_non_constant_column = i;
if (first_non_constant_column != static_cast<size_t>(filter_column))
break;
}
}
size_t filtered_rows = 0;
if (first_non_constant_column != static_cast<size_t>(filter_column))
{
ColumnWithTypeAndName & current_column = res.safeGetByPosition(first_non_constant_column);
current_column.column = current_column.column->filter(*filter_and_holder.data, -1);
filtered_rows = current_column.column->size();
}
else
{
filtered_rows = countBytesInFilter(*filter_and_holder.data);
}
/// If the current block is completely filtered out, let's move on to the next one.
if (filtered_rows == 0)
continue;
/// If all the rows pass through the filter.
if (filtered_rows == filter_and_holder.data->size())
{
/// Replace the column with the filter by a constant.
res.safeGetByPosition(filter_column).column = res.safeGetByPosition(filter_column).type->createColumnConst(filtered_rows, 1u);
/// No need to touch the rest of the columns.
return removeFilterIfNeed(std::move(res));
}
/// Filter the rest of the columns.
for (size_t i = 0; i < columns; ++i)
{
ColumnWithTypeAndName & current_column = res.safeGetByPosition(i);
if (i == static_cast<size_t>(filter_column))
{
/// The column with filter itself is replaced with a column with a constant `1`, since after filtering, nothing else will remain.
/// NOTE User could pass column with something different than 0 and 1 for filter.
/// Example:
/// SELECT materialize(100) AS x WHERE x
/// will work incorrectly.
current_column.column = current_column.type->createColumnConst(filtered_rows, 1u);
continue;
}
if (i == first_non_constant_column)
continue;
if (isColumnConst(*current_column.column))
current_column.column = current_column.column->cut(0, filtered_rows);
else
current_column.column = current_column.column->filter(*filter_and_holder.data, -1);
}
return removeFilterIfNeed(std::move(res));
}
}
Block FilterBlockInputStream::removeFilterIfNeed(Block && block) const
{
if (block && remove_filter)
block.erase(static_cast<size_t>(filter_column));
return std::move(block);
}
}

View File

@ -1,46 +0,0 @@
#pragma once
#include <DataStreams/IBlockInputStream.h>
#include <Columns/FilterDescription.h>
namespace DB
{
class ExpressionActions;
/** Implements WHERE, HAVING operations.
* A stream of blocks and an expression, which adds to the block one ColumnUInt8 column containing the filtering conditions, are passed as input.
* The expression is evaluated and a stream of blocks is returned, which contains only the filtered rows.
*/
class FilterBlockInputStream : public IBlockInputStream
{
private:
using ExpressionActionsPtr = std::shared_ptr<ExpressionActions>;
public:
FilterBlockInputStream(const BlockInputStreamPtr & input, ExpressionActionsPtr expression_,
String filter_column_name_, bool remove_filter_ = false);
String getName() const override;
Block getTotals() override;
Block getHeader() const override;
protected:
Block readImpl() override;
bool remove_filter;
private:
ExpressionActionsPtr expression;
Block header;
String filter_column_name;
ssize_t filter_column;
ConstantFilterDescription constant_filter_description;
Block removeFilterIfNeed(Block && block) const;
};
}

View File

@ -7,7 +7,6 @@ namespace DB
{
/** A stream of blocks from which you can read one block.
* Also see BlocksListBlockInputStream.
*/
class OneBlockInputStream : public IBlockInputStream
{

View File

@ -1,6 +1,7 @@
#include <DataStreams/ConvertingBlockInputStream.h>
#include <DataStreams/PushingToViewsBlockOutputStream.h>
#include <DataStreams/SquashingBlockInputStream.h>
#include <DataStreams/OneBlockInputStream.h>
#include <DataTypes/NestedUtils.h>
#include <Interpreters/InterpreterSelectQuery.h>
#include <Interpreters/InterpreterInsertQuery.h>

View File

@ -2,7 +2,6 @@
#include <DataStreams/copyData.h>
#include <DataStreams/IBlockOutputStream.h>
#include <DataStreams/OneBlockInputStream.h>
#include <DataStreams/MaterializingBlockInputStream.h>
#include <Storages/StorageMaterializedView.h>

View File

@ -1,4 +0,0 @@
set(SRCS)
add_executable (finish_sorting_stream finish_sorting_stream.cpp ${SRCS})
target_link_libraries (finish_sorting_stream PRIVATE clickhouse_aggregate_functions dbms)

View File

@ -1,132 +0,0 @@
#include <iostream>
#include <iomanip>
#include <pcg_random.hpp>
#include <DataTypes/DataTypesNumber.h>
#include <Columns/ColumnsNumber.h>
#include <Core/SortDescription.h>
#include <Interpreters/sortBlock.h>
#include <Processors/Transforms/FinishSortingTransform.h>
#include <Processors/Sources/SourceFromInputStream.h>
#include <Processors/QueryPipeline.h>
#include <Processors/Executors/PipelineExecutingBlockInputStream.h>
#include <Processors/Transforms/MergeSortingTransform.h>
#include <DataStreams/BlocksListBlockInputStream.h>
using namespace DB;
namespace DB
{
namespace ErrorCodes
{
extern const int LOGICAL_ERROR;
}
}
int main(int argc, char ** argv)
{
pcg64 rng;
try
{
size_t m = argc >= 2 ? std::stol(argv[1]) : 2;
size_t n = argc >= 3 ? std::stol(argv[2]) : 10;
SortDescription sort_descr;
sort_descr.emplace_back("col1", 1, 1);
Block block_header;
BlocksList blocks;
for (size_t t = 0; t < m; ++t)
{
Block block;
for (size_t i = 0; i < 2; ++i)
{
ColumnWithTypeAndName column;
column.name = "col" + std::to_string(i + 1);
column.type = std::make_shared<DataTypeInt32>();
auto col = ColumnInt32::create();
auto & vec = col->getData();
vec.resize(n);
for (size_t j = 0; j < n; ++j)
vec[j] = rng() % 10;
column.column = std::move(col);
block.insert(column);
}
if (!block_header)
block_header = block.cloneEmpty();
sortBlock(block, sort_descr);
blocks.emplace_back(std::move(block));
}
auto blocks_stream = std::make_shared<BlocksListBlockInputStream>(std::move(blocks));
Pipe source(std::make_shared<SourceFromInputStream>(std::move(blocks_stream)));
QueryPipeline pipeline;
pipeline.init(std::move(source));
pipeline.addTransform(std::make_shared<MergeSortingTransform>(pipeline.getHeader(), sort_descr, n, 0, 0, 0, nullptr, 0));
SortDescription sort_descr_final;
sort_descr_final.emplace_back("col1", 1, 1);
sort_descr_final.emplace_back("col2", 1, 1);
pipeline.addSimpleTransform([&](const Block & header)
{
return std::make_shared<FinishSortingTransform>(header, sort_descr, sort_descr_final, n, 0);
});
auto stream = std::make_shared<PipelineExecutingBlockInputStream>(std::move(pipeline));
{
Stopwatch stopwatch;
stopwatch.start();
Block res_block = block_header;
while (Block block = stream->read())
{
for (size_t i = 0; i < block.columns(); ++i)
{
MutableColumnPtr ptr = IColumn::mutate(std::move(res_block.getByPosition(i).column));
ptr->insertRangeFrom(*block.getByPosition(i).column.get(), 0, block.rows());
}
}
if (res_block.rows() != n * m)
throw Exception("Result block size mismatch", ErrorCodes::LOGICAL_ERROR);
const auto & columns = res_block.getColumns();
for (size_t i = 1; i < res_block.rows(); ++i)
for (const auto & col : columns)
{
int res = col->compareAt(i - 1, i, *col, 1);
if (res < 0)
break;
else if (res > 0)
throw Exception("Result stream not sorted", ErrorCodes::LOGICAL_ERROR);
}
stopwatch.stop();
std::cout << std::fixed << std::setprecision(2)
<< "Elapsed " << stopwatch.elapsedSeconds() << " sec."
<< ", " << n / stopwatch.elapsedSeconds() << " rows/sec."
<< std::endl;
}
}
catch (const Exception & e)
{
std::cerr << e.displayText() << std::endl;
return -1;
}
return 0;
}

View File

@ -25,7 +25,6 @@ SRCS(
DistinctSortedBlockInputStream.cpp
ExecutionSpeedLimits.cpp
ExpressionBlockInputStream.cpp
FilterBlockInputStream.cpp
finalizeBlock.cpp
IBlockInputStream.cpp
InputStreamFromASTInsertQuery.cpp

View File

@ -15,8 +15,6 @@
#include <DataTypes/DataTypeLowCardinality.h>
#include <DataTypes/FieldToDataType.h>
#include <DataStreams/LazyBlockInputStream.h>
#include <Columns/ColumnSet.h>
#include <Columns/ColumnConst.h>

View File

@ -97,7 +97,6 @@ auto createLocalPipe(
/* Now we don't need to materialize constants, because RemoteBlockInputStream will ignore constant and take it from header.
* So, streams from different threads will always have the same header.
*/
/// return std::make_shared<MaterializingBlockInputStream>(stream);
pipeline.setMaxThreads(1);
return QueryPipeline::getPipe(std::move(pipeline));

View File

@ -16,7 +16,6 @@ limitations under the License. */
#include <Interpreters/Context.h>
#include <Access/AccessFlags.h>
#include <DataStreams/IBlockInputStream.h>
#include <DataStreams/OneBlockInputStream.h>
#include <DataStreams/StreamLocalLimits.h>

View File

@ -5,7 +5,8 @@
#include <Core/Block.h>
#include <Interpreters/IJoin.h>
#include <Interpreters/TableJoin.h>
#include <DataStreams/OneBlockInputStream.h>
#include <DataStreams/IBlockInputStream.h>
namespace DB
{

View File

@ -10,12 +10,12 @@
#include <DataStreams/materializeBlock.h>
#include <DataStreams/TemporaryFileStream.h>
#include <Processors/Sources/SourceFromInputStream.h>
#include <DataStreams/OneBlockInputStream.h>
#include <Processors/QueryPipeline.h>
#include <Processors/Transforms/MergeSortingTransform.h>
#include <Processors/Executors/PipelineExecutingBlockInputStream.h>
#include <DataStreams/BlocksListBlockInputStream.h>
namespace DB
{

View File

@ -3,7 +3,7 @@
#include <Interpreters/IJoin.h>
#include <Interpreters/MergeJoin.h>
#include <Interpreters/ExpressionActions.h>
#include <DataStreams/LazyBlockInputStream.h>
namespace DB
{

View File

@ -440,7 +440,7 @@ static void setRow(Row & row, const ColumnRawPtrs & raw_columns, size_t row_num,
if (i < column_names.size())
column_name = column_names[i];
throw Exception("MergingSortedBlockInputStream failed to read row " + toString(row_num)
throw Exception("SummingSortedAlgorithm failed to read row " + toString(row_num)
+ " of column " + toString(i) + (column_name.empty() ? "" : " (" + column_name + ")"),
ErrorCodes::CORRUPTED_DATA);
}

View File

@ -2,7 +2,6 @@
#include <Storages/Kafka/parseSyslogLevel.h>
#include <DataStreams/IBlockInputStream.h>
#include <DataStreams/LimitBlockInputStream.h>
#include <DataStreams/UnionBlockInputStream.h>
#include <DataStreams/copyData.h>
#include <DataTypes/DataTypeDateTime.h>

View File

@ -16,7 +16,6 @@ limitations under the License. */
#include <DataTypes/DataTypeString.h>
#include <Columns/ColumnString.h>
#include <Columns/ColumnsNumber.h>
#include <DataStreams/OneBlockInputStream.h>
#include <DataStreams/IBlockInputStream.h>
#include <Storages/LiveView/StorageLiveView.h>

View File

@ -16,7 +16,6 @@ limitations under the License. */
#include <Interpreters/Context.h>
#include <Interpreters/InterpreterSelectQuery.h>
#include <DataStreams/IBlockOutputStream.h>
#include <DataStreams/OneBlockInputStream.h>
#include <DataStreams/BlocksSource.h>
#include <DataStreams/MaterializingBlockInputStream.h>
#include <DataStreams/SquashingBlockInputStream.h>

View File

@ -1,7 +1,6 @@
#include <Storages/RabbitMQ/StorageRabbitMQ.h>
#include <DataStreams/IBlockInputStream.h>
#include <DataStreams/ConvertingBlockInputStream.h>
#include <DataStreams/LimitBlockInputStream.h>
#include <DataStreams/UnionBlockInputStream.h>
#include <DataStreams/copyData.h>
#include <DataTypes/DataTypeDateTime.h>

View File

@ -1,7 +1,5 @@
#include <Storages/StorageDistributed.h>
#include <DataStreams/OneBlockInputStream.h>
#include <Databases/IDatabase.h>
#include <Disks/StoragePolicy.h>
#include <Disks/DiskLocal.h>

View File

@ -17,7 +17,6 @@
#include <DataStreams/IBlockInputStream.h>
#include <DataStreams/IBlockOutputStream.h>
#include <DataStreams/AddingDefaultsBlockInputStream.h>
#include <DataStreams/narrowBlockInputStreams.h>
#include <Common/escapeForFileName.h>
#include <Common/typeid_cast.h>

View File

@ -17,7 +17,6 @@
#include <DataStreams/IBlockOutputStream.h>
#include <DataStreams/OwningBlockInputStream.h>
#include <DataStreams/IBlockInputStream.h>
#include <DataStreams/narrowBlockInputStreams.h>
#include <Common/parseGlobs.h>
#include <Poco/URI.h>

View File

@ -4,7 +4,6 @@
#include <Core/NamesAndTypes.h>
#include <Storages/IStorage.h>
#include <DataStreams/NullBlockInputStream.h>
#include <DataStreams/NullBlockOutputStream.h>
#include <Processors/Sources/NullSource.h>
#include <Processors/Pipe.h>

View File

@ -47,7 +47,6 @@
#include <Interpreters/Context.h>
#include <DataStreams/RemoteBlockInputStream.h>
#include <DataStreams/NullBlockOutputStream.h>
#include <DataStreams/copyData.h>
#include <Poco/DirectoryIterator.h>

View File

@ -18,7 +18,6 @@
#include <DataStreams/IBlockOutputStream.h>
#include <DataStreams/NativeBlockInputStream.h>
#include <DataStreams/NativeBlockOutputStream.h>
#include <DataStreams/NullBlockInputStream.h>
#include <DataTypes/DataTypeFactory.h>

View File

@ -1,7 +1,6 @@
#include <Storages/IStorage.h>
#include <Storages/ColumnsDescription.h>
#include <Storages/StorageValues.h>
#include <DataStreams/OneBlockInputStream.h>
#include <Processors/Sources/SourceFromSingleChunk.h>
#include <Processors/Pipe.h>

View File

@ -5,7 +5,6 @@
#include <Columns/ColumnString.h>
#include <DataTypes/DataTypeString.h>
#include <DataTypes/DataTypesNumber.h>
#include <DataStreams/NullBlockInputStream.h>
#include <Storages/VirtualColumnUtils.h>
#include <Parsers/queryToString.h>
#include <Parsers/ASTSelectQuery.h>

View File

@ -3,7 +3,6 @@
#include <DataTypes/DataTypeString.h>
#include <DataTypes/DataTypesNumber.h>
#include <DataTypes/DataTypeNullable.h>
#include <DataStreams/OneBlockInputStream.h>
#include <ext/shared_ptr_helper.h>
#include <Storages/IStorage.h>
#include <Storages/System/StorageSystemPartsBase.h>

View File

@ -1,4 +1,3 @@
#include <DataStreams/OneBlockInputStream.h>
#include <Storages/System/StorageSystemDisks.h>
#include <Processors/Sources/SourceFromSingleChunk.h>
#include <Interpreters/Context.h>

View File

@ -3,7 +3,6 @@
#include <DataTypes/DataTypesNumber.h>
#include <DataTypes/DataTypeDateTime.h>
#include <DataTypes/DataTypeArray.h>
#include <DataStreams/OneBlockInputStream.h>
#include <Storages/MergeTree/MergeTreeData.h>
#include <Storages/MergeTree/MergeTreeMutationStatus.h>
#include <Storages/VirtualColumnUtils.h>

View File

@ -7,7 +7,6 @@
#include <DataTypes/DataTypesNumber.h>
#include <DataTypes/DataTypeDateTime.h>
#include <DataTypes/DataTypeDate.h>
#include <DataStreams/OneBlockInputStream.h>
#include <Storages/VirtualColumnUtils.h>
#include <Databases/IDatabase.h>
#include <Parsers/queryToString.h>

View File

@ -6,7 +6,6 @@
#include <DataTypes/DataTypesNumber.h>
#include <DataTypes/DataTypeDateTime.h>
#include <DataTypes/DataTypeDate.h>
#include <DataStreams/OneBlockInputStream.h>
#include <Storages/MergeTree/MergeTreeData.h>
#include <Storages/VirtualColumnUtils.h>
#include <Access/ContextAccess.h>

View File

@ -6,7 +6,6 @@
#include <DataTypes/DataTypesNumber.h>
#include <DataTypes/DataTypeDateTime.h>
#include <DataTypes/DataTypeDate.h>
#include <DataStreams/OneBlockInputStream.h>
#include <Storages/VirtualColumnUtils.h>
#include <Databases/IDatabase.h>
#include <Parsers/queryToString.h>

View File

@ -2,7 +2,6 @@
#include <DataTypes/DataTypeString.h>
#include <DataTypes/DataTypesNumber.h>
#include <DataTypes/DataTypeDateTime.h>
#include <DataStreams/OneBlockInputStream.h>
#include <Storages/System/StorageSystemReplicas.h>
#include <Storages/StorageReplicatedMergeTree.h>
#include <Storages/VirtualColumnUtils.h>

View File

@ -2,7 +2,6 @@
#include <Columns/ColumnArray.h>
#include <Columns/ColumnNullable.h>
#include <DataStreams/OneBlockInputStream.h>
#include <DataTypes/DataTypeArray.h>
#include <DataTypes/DataTypeNullable.h>
#include <Processors/Sources/SourceFromSingleChunk.h>

View File

@ -3,7 +3,6 @@
#include <DataTypes/DataTypeString.h>
#include <DataTypes/DataTypeDateTime.h>
#include <DataTypes/DataTypeNullable.h>
#include <DataStreams/OneBlockInputStream.h>
#include <Storages/System/StorageSystemTables.h>
#include <Storages/VirtualColumnUtils.h>
#include <Databases/IDatabase.h>