mirror of
https://github.com/ClickHouse/ClickHouse.git
synced 2024-09-22 17:50:47 +00:00
Merge pull request #1921 from yandex/header-in-input-streams
Header in input streams
This commit is contained in:
commit
d73080640d
2
contrib/zookeeper
vendored
2
contrib/zookeeper
vendored
@ -1 +1 @@
|
||||
Subproject commit 438afae5af36c5be9c82d074f43a9bb19e0797c0
|
||||
Subproject commit 5aa9e889fe9e739af3c2a00222d9a3a0a57179dd
|
@ -164,8 +164,7 @@ public:
|
||||
{
|
||||
const auto cond_arg = arguments[i].get();
|
||||
if (!typeid_cast<const DataTypeUInt8 *>(cond_arg))
|
||||
throw Exception{
|
||||
"Illegal type " + cond_arg->getName() + " of argument " + toString(i + 1) +
|
||||
throw Exception{"Illegal type " + cond_arg->getName() + " of argument " + toString(i + 1) +
|
||||
" of aggregate function " + derived().getName() + ", must be UInt8",
|
||||
ErrorCodes::ILLEGAL_TYPE_OF_ARGUMENT};
|
||||
}
|
||||
|
@ -40,9 +40,6 @@ struct UInt128
|
||||
bool inline operator> (const UInt128 rhs) const { return tuple() > rhs.tuple(); }
|
||||
bool inline operator>= (const UInt128 rhs) const { return tuple() >= rhs.tuple(); }
|
||||
|
||||
/** Types who are stored at the moment in the database have no more than 64bits and can be handle
|
||||
* inside an unique UInt64.
|
||||
*/
|
||||
template <typename T> bool inline operator== (const T rhs) const { return *this == UInt128(rhs); }
|
||||
template <typename T> bool inline operator!= (const T rhs) const { return *this != UInt128(rhs); }
|
||||
template <typename T> bool inline operator>= (const T rhs) const { return *this >= UInt128(rhs); }
|
||||
|
@ -29,6 +29,7 @@ STRONG_TYPEDEF(TupleBackend, Tuple); /// Array and Tuple are different types wit
|
||||
|
||||
|
||||
/** 32 is enough. Round number is used for alignment and for better arithmetic inside std::vector.
|
||||
* NOTE: Actually, sizeof(std::string) is 32 when using libc++, so Field is 40 bytes.
|
||||
*/
|
||||
#define DBMS_MIN_FIELD_SIZE 32
|
||||
|
||||
|
@ -18,7 +18,7 @@ namespace DB
|
||||
|
||||
std::ostream & operator<<(std::ostream & stream, const IBlockInputStream & what)
|
||||
{
|
||||
stream << "IBlockInputStream(id = " << what.getID() << ", name = " << what.getName() << ")";
|
||||
stream << "IBlockInputStream(name = " << what.getName() << ")";
|
||||
//what.dumpTree(stream); // todo: set const
|
||||
return stream;
|
||||
}
|
||||
@ -115,7 +115,6 @@ std::ostream & operator<<(std::ostream & stream, const Connection::Packet & what
|
||||
std::ostream & operator<<(std::ostream & stream, const SubqueryForSet & what)
|
||||
{
|
||||
stream << "SubqueryForSet(source = " << what.source
|
||||
<< ", source_sample = " << what.source_sample
|
||||
// TODO: << ", set = " << what.set << ", join = " << what.join
|
||||
<< ", table = " << what.table
|
||||
<< ")";
|
||||
|
@ -24,11 +24,11 @@ public:
|
||||
|
||||
String getName() const override { return "AddingConstColumn"; }
|
||||
|
||||
String getID() const override
|
||||
Block getHeader() const override
|
||||
{
|
||||
std::stringstream res;
|
||||
res << "AddingConstColumn(" << children.back()->getID() << ")";
|
||||
return res.str();
|
||||
Block res = children.back()->getHeader();
|
||||
res.insert({data_type->createColumn(), data_type, column_name});
|
||||
return res;
|
||||
}
|
||||
|
||||
protected:
|
||||
|
@ -14,6 +14,11 @@ namespace ProfileEvents
|
||||
namespace DB
|
||||
{
|
||||
|
||||
Block AggregatingBlockInputStream::getHeader() const
|
||||
{
|
||||
return aggregator.getHeader(final);
|
||||
}
|
||||
|
||||
|
||||
Block AggregatingBlockInputStream::readImpl()
|
||||
{
|
||||
@ -42,7 +47,7 @@ Block AggregatingBlockInputStream::readImpl()
|
||||
|
||||
if (!isCancelled())
|
||||
{
|
||||
/// Flush data in the RAM to disk also. It's easier.
|
||||
/// Flush data in the RAM to disk also. It's easier than merging on-disk and RAM data.
|
||||
if (data_variants->size())
|
||||
aggregator.writeToTemporaryFile(*data_variants);
|
||||
}
|
||||
@ -63,9 +68,8 @@ Block AggregatingBlockInputStream::readImpl()
|
||||
}
|
||||
}
|
||||
|
||||
Block res;
|
||||
if (isCancelled() || !impl)
|
||||
return res;
|
||||
return {};
|
||||
|
||||
return impl->read();
|
||||
}
|
||||
|
@ -30,12 +30,7 @@ public:
|
||||
|
||||
String getName() const override { return "Aggregating"; }
|
||||
|
||||
String getID() const override
|
||||
{
|
||||
std::stringstream res;
|
||||
res << "Aggregating(" << children.back()->getID() << ", " << aggregator.getID() << ")";
|
||||
return res.str();
|
||||
}
|
||||
Block getHeader() const override;
|
||||
|
||||
protected:
|
||||
Block readImpl() override;
|
||||
|
@ -28,26 +28,8 @@ public:
|
||||
|
||||
String getName() const override { return "AggregatingSorted"; }
|
||||
|
||||
String getID() const override
|
||||
{
|
||||
std::stringstream res;
|
||||
res << "AggregatingSorted(inputs";
|
||||
|
||||
for (size_t i = 0; i < children.size(); ++i)
|
||||
res << ", " << children[i]->getID();
|
||||
|
||||
res << ", description";
|
||||
|
||||
for (size_t i = 0; i < description.size(); ++i)
|
||||
res << ", " << description[i].getID();
|
||||
|
||||
res << ")";
|
||||
return res.str();
|
||||
}
|
||||
|
||||
bool isGroupedOutput() const override { return true; }
|
||||
bool isSortedOutput() const override { return true; }
|
||||
const SortDescription & getSortDescription() const override { return description; }
|
||||
|
||||
protected:
|
||||
/// Can return 1 more records than max_block_size.
|
||||
|
@ -35,13 +35,6 @@ public:
|
||||
|
||||
String getName() const override { return "Asynchronous"; }
|
||||
|
||||
String getID() const override
|
||||
{
|
||||
std::stringstream res;
|
||||
res << "Asynchronous(" << children.back()->getID() << ")";
|
||||
return res.str();
|
||||
}
|
||||
|
||||
void readPrefix() override
|
||||
{
|
||||
/// Do not call `readPrefix` on the child, so that the corresponding actions are performed in a separate thread.
|
||||
@ -80,6 +73,9 @@ public:
|
||||
}
|
||||
|
||||
|
||||
Block getHeader() const override { return children.at(0)->getHeader(); }
|
||||
|
||||
|
||||
~AsynchronousBlockInputStream() override
|
||||
{
|
||||
if (started)
|
||||
|
@ -5,7 +5,7 @@
|
||||
namespace DB
|
||||
{
|
||||
|
||||
/** Adds to one thread additional block information that is specified
|
||||
/** Adds to one stream additional block information that is specified
|
||||
* as the constructor parameter.
|
||||
*/
|
||||
class BlockExtraInfoInputStream : public IProfilingBlockInputStream
|
||||
@ -24,12 +24,7 @@ public:
|
||||
|
||||
String getName() const override { return "BlockExtraInfoInput"; }
|
||||
|
||||
String getID() const override
|
||||
{
|
||||
std::stringstream res;
|
||||
res << "BlockExtraInfoInput(" << children.back()->getID() << ")";
|
||||
return res.str();
|
||||
}
|
||||
Block getHeader() const override { return children.back()->getHeader(); }
|
||||
|
||||
protected:
|
||||
Block readImpl() override
|
||||
|
@ -21,7 +21,6 @@ struct BlockIO
|
||||
BlockInputStreamPtr in;
|
||||
BlockOutputStreamPtr out;
|
||||
|
||||
Block in_sample; /// Example of a block to be read from `in`.
|
||||
Block out_sample; /// Example of a block to be written to `out`.
|
||||
|
||||
/// Callbacks for query logging could be set here.
|
||||
@ -51,7 +50,6 @@ struct BlockIO
|
||||
process_list_entry = rhs.process_list_entry;
|
||||
in = rhs.in;
|
||||
out = rhs.out;
|
||||
in_sample = rhs.in_sample;
|
||||
out_sample = rhs.out_sample;
|
||||
|
||||
finish_callback = rhs.finish_callback;
|
||||
|
@ -29,15 +29,10 @@ public:
|
||||
|
||||
String getName() const override { return "BlockInputStreamFromRowInputStream"; }
|
||||
|
||||
String getID() const override
|
||||
{
|
||||
std::stringstream res;
|
||||
res << this;
|
||||
return res.str();
|
||||
}
|
||||
|
||||
RowInputStreamPtr & getRowInput() { return row_input; }
|
||||
|
||||
Block getHeader() const override { return sample; }
|
||||
|
||||
protected:
|
||||
Block readImpl() override;
|
||||
|
||||
|
@ -22,13 +22,6 @@ public:
|
||||
|
||||
String getName() const override { return "BlocksList"; }
|
||||
|
||||
String getID() const override
|
||||
{
|
||||
std::stringstream res;
|
||||
res << this;
|
||||
return res.str();
|
||||
}
|
||||
|
||||
protected:
|
||||
Block readImpl() override
|
||||
{
|
||||
|
@ -20,11 +20,6 @@ String CastTypeBlockInputStream::getName() const
|
||||
return "CastType";
|
||||
}
|
||||
|
||||
String CastTypeBlockInputStream::getID() const
|
||||
{
|
||||
return "CastType(" + children.back()->getID() + ")";
|
||||
}
|
||||
|
||||
Block CastTypeBlockInputStream::readImpl()
|
||||
{
|
||||
Block block = children.back()->read();
|
||||
|
@ -17,7 +17,7 @@ public:
|
||||
|
||||
String getName() const override;
|
||||
|
||||
String getID() const override;
|
||||
Block getHeader() const override { return ref_definition; }
|
||||
|
||||
protected:
|
||||
Block readImpl() override;
|
||||
|
@ -28,26 +28,11 @@ public:
|
||||
|
||||
String getName() const override { return "CollapsingFinal"; }
|
||||
|
||||
String getID() const override
|
||||
{
|
||||
std::stringstream res;
|
||||
res << "CollapsingFinal(inputs";
|
||||
|
||||
for (size_t i = 0; i < children.size(); ++i)
|
||||
res << ", " << children[i]->getID();
|
||||
|
||||
res << ", description";
|
||||
|
||||
for (size_t i = 0; i < description.size(); ++i)
|
||||
res << ", " << description[i].getID();
|
||||
|
||||
res << ", sign_column, " << sign_column_name << ")";
|
||||
return res.str();
|
||||
}
|
||||
|
||||
bool isSortedOutput() const override { return true; }
|
||||
const SortDescription & getSortDescription() const override { return description; }
|
||||
|
||||
Block getHeader() const override { return children.at(0)->getHeader(); }
|
||||
|
||||
protected:
|
||||
Block readImpl() override;
|
||||
|
||||
|
@ -33,23 +33,6 @@ public:
|
||||
|
||||
String getName() const override { return "CollapsingSorted"; }
|
||||
|
||||
String getID() const override
|
||||
{
|
||||
std::stringstream res;
|
||||
res << "CollapsingSorted(inputs";
|
||||
|
||||
for (size_t i = 0; i < children.size(); ++i)
|
||||
res << ", " << children[i]->getID();
|
||||
|
||||
res << ", description";
|
||||
|
||||
for (size_t i = 0; i < description.size(); ++i)
|
||||
res << ", " << description[i].getID();
|
||||
|
||||
res << ", sign_column, " << sign_column << ")";
|
||||
return res.str();
|
||||
}
|
||||
|
||||
protected:
|
||||
/// Can return 1 more records than max_block_size.
|
||||
Block readImpl() override;
|
||||
|
@ -30,19 +30,6 @@ ColumnGathererStream::ColumnGathererStream(
|
||||
}
|
||||
|
||||
|
||||
String ColumnGathererStream::getID() const
|
||||
{
|
||||
std::stringstream res;
|
||||
|
||||
res << getName() << "(";
|
||||
for (size_t i = 0; i < children.size(); ++i)
|
||||
res << (i == 0 ? "" : ", " ) << children[i]->getID();
|
||||
res << ")";
|
||||
|
||||
return res.str();
|
||||
}
|
||||
|
||||
|
||||
void ColumnGathererStream::init()
|
||||
{
|
||||
sources.reserve(children.size());
|
||||
@ -107,13 +94,13 @@ void ColumnGathererStream::fetchNewBlock(Source & source, size_t source_num)
|
||||
}
|
||||
catch (Exception & e)
|
||||
{
|
||||
e.addMessage("Cannot fetch required block. Stream " + children[source_num]->getID() + ", part " + toString(source_num));
|
||||
e.addMessage("Cannot fetch required block. Stream " + children[source_num]->getName() + ", part " + toString(source_num));
|
||||
throw;
|
||||
}
|
||||
|
||||
if (0 == source.size)
|
||||
{
|
||||
throw Exception("Fetched block is empty. Stream " + children[source_num]->getID() + ", part " + toString(source_num),
|
||||
throw Exception("Fetched block is empty. Stream " + children[source_num]->getName() + ", part " + toString(source_num),
|
||||
ErrorCodes::RECEIVED_EMPTY_DATA);
|
||||
}
|
||||
}
|
||||
|
@ -61,12 +61,12 @@ public:
|
||||
|
||||
String getName() const override { return "ColumnGatherer"; }
|
||||
|
||||
String getID() const override;
|
||||
|
||||
Block readImpl() override;
|
||||
|
||||
void readSuffixImpl() override;
|
||||
|
||||
Block getHeader() const override { return children.at(0)->getHeader(); }
|
||||
|
||||
/// for use in implementations of IColumn::gather()
|
||||
template <typename Column>
|
||||
void gather(Column & column_res);
|
||||
|
@ -22,24 +22,7 @@ public:
|
||||
|
||||
String getName() const override { return "Concat"; }
|
||||
|
||||
String getID() const override
|
||||
{
|
||||
std::stringstream res;
|
||||
res << "Concat(";
|
||||
|
||||
Strings children_ids(children.size());
|
||||
for (size_t i = 0; i < children.size(); ++i)
|
||||
children_ids[i] = children[i]->getID();
|
||||
|
||||
/// Let's assume that the order of concatenation of blocks does not matter.
|
||||
std::sort(children_ids.begin(), children_ids.end());
|
||||
|
||||
for (size_t i = 0; i < children_ids.size(); ++i)
|
||||
res << (i == 0 ? "" : ", ") << children_ids[i];
|
||||
|
||||
res << ")";
|
||||
return res.str();
|
||||
}
|
||||
Block getHeader() const override { return children.at(0)->getHeader(); }
|
||||
|
||||
protected:
|
||||
Block readImpl() override
|
||||
|
@ -35,24 +35,7 @@ public:
|
||||
|
||||
String getName() const override { return "CreatingSets"; }
|
||||
|
||||
String getID() const override
|
||||
{
|
||||
std::stringstream res;
|
||||
res << "CreatingSets(";
|
||||
|
||||
Strings children_ids(children.size());
|
||||
for (size_t i = 0; i < children.size(); ++i)
|
||||
children_ids[i] = children[i]->getID();
|
||||
|
||||
/// Let's assume that the order of creating sets does not matter.
|
||||
std::sort(children_ids.begin(), children_ids.end() - 1);
|
||||
|
||||
for (size_t i = 0; i < children_ids.size(); ++i)
|
||||
res << (i == 0 ? "" : ", ") << children_ids[i];
|
||||
|
||||
res << ")";
|
||||
return res.str();
|
||||
}
|
||||
Block getHeader() const override { return children.back()->getHeader(); }
|
||||
|
||||
/// Takes `totals` only from the main source, not from subquery sources.
|
||||
const Block & getTotals() override;
|
||||
|
@ -18,13 +18,6 @@ DistinctBlockInputStream::DistinctBlockInputStream(const BlockInputStreamPtr & i
|
||||
children.push_back(input);
|
||||
}
|
||||
|
||||
String DistinctBlockInputStream::getID() const
|
||||
{
|
||||
std::stringstream res;
|
||||
res << "Distinct(" << children.back()->getID() << ")";
|
||||
return res.str();
|
||||
}
|
||||
|
||||
Block DistinctBlockInputStream::readImpl()
|
||||
{
|
||||
/// Execute until end of stream or until
|
||||
|
@ -22,7 +22,7 @@ public:
|
||||
|
||||
String getName() const override { return "Distinct"; }
|
||||
|
||||
String getID() const override;
|
||||
Block getHeader() const override { return children.at(0)->getHeader(); }
|
||||
|
||||
protected:
|
||||
Block readImpl() override;
|
||||
|
@ -19,13 +19,6 @@ DistinctSortedBlockInputStream::DistinctSortedBlockInputStream(const BlockInputS
|
||||
children.push_back(input);
|
||||
}
|
||||
|
||||
String DistinctSortedBlockInputStream::getID() const
|
||||
{
|
||||
std::stringstream res;
|
||||
res << "DistinctSorted(" << children.back()->getID() << ")";
|
||||
return res.str();
|
||||
}
|
||||
|
||||
Block DistinctSortedBlockInputStream::readImpl()
|
||||
{
|
||||
/// Execute until end of stream or until
|
||||
|
@ -25,7 +25,7 @@ public:
|
||||
|
||||
String getName() const override { return "DistinctSorted"; }
|
||||
|
||||
String getID() const override;
|
||||
Block getHeader() const override { return children.at(0)->getHeader(); }
|
||||
|
||||
protected:
|
||||
Block readImpl() override;
|
||||
|
@ -13,13 +13,6 @@ ExpressionBlockInputStream::ExpressionBlockInputStream(const BlockInputStreamPtr
|
||||
|
||||
String ExpressionBlockInputStream::getName() const { return "Expression"; }
|
||||
|
||||
String ExpressionBlockInputStream::getID() const
|
||||
{
|
||||
std::stringstream res;
|
||||
res << "Expression(" << children.back()->getID() << ", " << expression->getID() << ")";
|
||||
return res.str();
|
||||
}
|
||||
|
||||
const Block & ExpressionBlockInputStream::getTotals()
|
||||
{
|
||||
if (IProfilingBlockInputStream * child = dynamic_cast<IProfilingBlockInputStream *>(&*children.back()))
|
||||
@ -31,14 +24,19 @@ const Block & ExpressionBlockInputStream::getTotals()
|
||||
return totals;
|
||||
}
|
||||
|
||||
Block ExpressionBlockInputStream::getHeader() const
|
||||
{
|
||||
Block res = children.back()->getHeader();
|
||||
expression->execute(res);
|
||||
return res;
|
||||
}
|
||||
|
||||
Block ExpressionBlockInputStream::readImpl()
|
||||
{
|
||||
Block res = children.back()->read();
|
||||
if (!res)
|
||||
return res;
|
||||
|
||||
expression->execute(res);
|
||||
|
||||
return res;
|
||||
}
|
||||
|
||||
|
@ -22,8 +22,8 @@ public:
|
||||
ExpressionBlockInputStream(const BlockInputStreamPtr & input, const ExpressionActionsPtr & expression_);
|
||||
|
||||
String getName() const override;
|
||||
String getID() const override;
|
||||
const Block & getTotals() override;
|
||||
Block getHeader() const override;
|
||||
|
||||
protected:
|
||||
Block readImpl() override;
|
||||
|
@ -23,24 +23,40 @@ FilterBlockInputStream::FilterBlockInputStream(const BlockInputStreamPtr & input
|
||||
children.push_back(input);
|
||||
}
|
||||
|
||||
FilterBlockInputStream::FilterBlockInputStream(const BlockInputStreamPtr & input, const ExpressionActionsPtr & expression_, const String & filter_column_name_)
|
||||
: expression(expression_), filter_column(-1), filter_column_name(filter_column_name_)
|
||||
FilterBlockInputStream::FilterBlockInputStream(const BlockInputStreamPtr & input, const ExpressionActionsPtr & expression_, const String & filter_column_name)
|
||||
: expression(expression_)
|
||||
{
|
||||
children.push_back(input);
|
||||
|
||||
/// Determine position of filter column.
|
||||
header = input->getHeader();
|
||||
expression->execute(header);
|
||||
|
||||
filter_column = header.getPositionByName(filter_column_name);
|
||||
|
||||
/// Isn't the filter already constant?
|
||||
ColumnPtr column = header.safeGetByPosition(filter_column).column;
|
||||
|
||||
if (!have_constant_filter_description)
|
||||
{
|
||||
have_constant_filter_description = true;
|
||||
if (column)
|
||||
constant_filter_description = ConstantFilterDescription(*column);
|
||||
}
|
||||
|
||||
if (!constant_filter_description.always_false
|
||||
&& !constant_filter_description.always_true)
|
||||
{
|
||||
/// Replace the filter column to a constant with value 1.
|
||||
auto header_filter_elem = header.getByPosition(filter_column);
|
||||
header_filter_elem.column = header_filter_elem.type->createColumnConst(header.rows(), UInt64(1));
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
String FilterBlockInputStream::getName() const { return "Filter"; }
|
||||
|
||||
|
||||
String FilterBlockInputStream::getID() const
|
||||
{
|
||||
std::stringstream res;
|
||||
res << "Filter(" << children.back()->getID() << ", " << expression->getID() << ", " << filter_column << ", " << filter_column_name << ")";
|
||||
return res.str();
|
||||
}
|
||||
|
||||
|
||||
const Block & FilterBlockInputStream::getTotals()
|
||||
{
|
||||
if (IProfilingBlockInputStream * child = dynamic_cast<IProfilingBlockInputStream *>(&*children.back()))
|
||||
@ -53,34 +69,19 @@ const Block & FilterBlockInputStream::getTotals()
|
||||
}
|
||||
|
||||
|
||||
Block FilterBlockInputStream::getHeader() const
|
||||
{
|
||||
return header;
|
||||
}
|
||||
|
||||
|
||||
Block FilterBlockInputStream::readImpl()
|
||||
{
|
||||
Block res;
|
||||
|
||||
if (is_first)
|
||||
if (!have_constant_filter_description)
|
||||
{
|
||||
is_first = false;
|
||||
|
||||
const Block & sample_block = expression->getSampleBlock();
|
||||
|
||||
/// Find the current position of the filter column in the block.
|
||||
/** sample_block has the result structure of evaluating the expression.
|
||||
* But this structure does not necessarily match expression->execute(res) below,
|
||||
* because the expression can be applied to a block that also contains additional,
|
||||
* columns unnecessary for this expression, but needed later, in the next stages of the query execution pipeline.
|
||||
* There will be no such columns in sample_block.
|
||||
* Therefore, the position of the filter column in it can be different.
|
||||
*/
|
||||
ssize_t filter_column_in_sample_block = filter_column;
|
||||
if (filter_column_in_sample_block == -1)
|
||||
filter_column_in_sample_block = sample_block.getPositionByName(filter_column_name);
|
||||
|
||||
/// Let's check if the filter column is a constant containing 0 or 1.
|
||||
ColumnPtr column = sample_block.safeGetByPosition(filter_column_in_sample_block).column;
|
||||
|
||||
if (column)
|
||||
constant_filter_description = ConstantFilterDescription(*column);
|
||||
|
||||
getHeader();
|
||||
if (constant_filter_description.always_false)
|
||||
return res;
|
||||
}
|
||||
@ -97,10 +98,6 @@ Block FilterBlockInputStream::readImpl()
|
||||
if (constant_filter_description.always_true)
|
||||
return res;
|
||||
|
||||
/// Find the current position of the filter column in the block.
|
||||
if (filter_column == -1)
|
||||
filter_column = res.getPositionByName(filter_column_name);
|
||||
|
||||
size_t columns = res.columns();
|
||||
ColumnPtr column = res.safeGetByPosition(filter_column).column;
|
||||
|
||||
|
@ -25,20 +25,19 @@ public:
|
||||
FilterBlockInputStream(const BlockInputStreamPtr & input, const ExpressionActionsPtr & expression_, const String & filter_column_name_);
|
||||
|
||||
String getName() const override;
|
||||
String getID() const override;
|
||||
const Block & getTotals() override;
|
||||
Block getHeader() const override;
|
||||
|
||||
protected:
|
||||
Block readImpl() override;
|
||||
|
||||
private:
|
||||
ExpressionActionsPtr expression;
|
||||
Block header;
|
||||
ssize_t filter_column;
|
||||
String filter_column_name;
|
||||
|
||||
bool is_first = true;
|
||||
|
||||
ConstantFilterDescription constant_filter_description;
|
||||
bool have_constant_filter_description = false;
|
||||
};
|
||||
|
||||
}
|
||||
|
@ -3,16 +3,16 @@
|
||||
namespace DB
|
||||
{
|
||||
|
||||
String FilterColumnsBlockInputStream::getID() const
|
||||
Block FilterColumnsBlockInputStream::getHeader() const
|
||||
{
|
||||
std::stringstream res;
|
||||
res << "FilterColumnsBlockInputStream(" << children.back()->getID();
|
||||
Block block = children.back()->getHeader();
|
||||
Block filtered;
|
||||
|
||||
for (const auto & it : columns_to_save)
|
||||
res << ", " << it;
|
||||
if (throw_if_column_not_found || block.has(it))
|
||||
filtered.insert(std::move(block.getByName(it)));
|
||||
|
||||
res << ")";
|
||||
return res.str();
|
||||
return filtered;
|
||||
}
|
||||
|
||||
Block FilterColumnsBlockInputStream::readImpl()
|
||||
|
@ -24,7 +24,7 @@ public:
|
||||
return "FilterColumnsBlockInputStream";
|
||||
}
|
||||
|
||||
String getID() const override;
|
||||
Block getHeader() const override;
|
||||
|
||||
protected:
|
||||
Block readImpl() override;
|
||||
|
@ -59,7 +59,7 @@ BlockInputStreamPtr FormatFactory::getInput(const String & name, ReadBuffer & bu
|
||||
|
||||
if (name == "Native")
|
||||
{
|
||||
return std::make_shared<NativeBlockInputStream>(buf);
|
||||
return std::make_shared<NativeBlockInputStream>(buf, sample, 0);
|
||||
}
|
||||
else if (name == "RowBinary")
|
||||
{
|
||||
|
@ -135,23 +135,6 @@ public:
|
||||
|
||||
String getName() const override { return "GraphiteRollupSorted"; }
|
||||
|
||||
String getID() const override
|
||||
{
|
||||
std::stringstream res;
|
||||
res << "GraphiteRollupSorted(inputs";
|
||||
|
||||
for (size_t i = 0; i < children.size(); ++i)
|
||||
res << ", " << children[i]->getID();
|
||||
|
||||
res << ", description";
|
||||
|
||||
for (size_t i = 0; i < description.size(); ++i)
|
||||
res << ", " << description[i].getID();
|
||||
|
||||
res << ")";
|
||||
return res.str();
|
||||
}
|
||||
|
||||
~GraphiteRollupSortedBlockInputStream()
|
||||
{
|
||||
if (aggregate_state_created)
|
||||
|
@ -64,6 +64,7 @@ void IBlockInputStream::dumpTree(std::ostream & ostr, size_t indent, size_t mult
|
||||
ostr << String(indent, ' ') << getName();
|
||||
if (multiplier > 1)
|
||||
ostr << " × " << multiplier;
|
||||
// ostr << ": " << getHeader().dumpStructure();
|
||||
ostr << std::endl;
|
||||
++indent;
|
||||
|
||||
@ -125,13 +126,5 @@ void IBlockInputStream::getLeavesImpl(BlockInputStreams & res, const BlockInputS
|
||||
(*it)->getLeavesImpl(res, *it);
|
||||
}
|
||||
|
||||
/// By default all instances is different streams
|
||||
String IBlockInputStream::getID() const
|
||||
{
|
||||
std::stringstream res;
|
||||
res << getName() << "(" << this << ")";
|
||||
return res.str();
|
||||
};
|
||||
|
||||
}
|
||||
|
||||
|
@ -48,6 +48,12 @@ class IBlockInputStream : private boost::noncopyable
|
||||
public:
|
||||
IBlockInputStream() {}
|
||||
|
||||
/** Get data structure of the stream in a form of "header" block (it is also called "sample block").
|
||||
* Header block contains column names, data types, columns of size 0. Constant columns must have corresponding values.
|
||||
* It is guaranteed that method "read" returns blocks of exactly that structure.
|
||||
*/
|
||||
virtual Block getHeader() const = 0;
|
||||
|
||||
/** Read next block.
|
||||
* If there are no more blocks, return an empty block (for which operator `bool` returns false).
|
||||
* NOTE: Only one thread can read from one instance of IBlockInputStream simultaneously.
|
||||
@ -76,14 +82,6 @@ public:
|
||||
*/
|
||||
virtual String getName() const = 0;
|
||||
|
||||
/** The unique identifier of the pipeline part of the query execution.
|
||||
* Sources with the same identifier are considered identical
|
||||
* (producing the same data), and can be replaced by one source
|
||||
* if several queries are executed simultaneously.
|
||||
* If the source can not be glued together with any other - return the object's address as an identifier.
|
||||
*/
|
||||
virtual String getID() const;
|
||||
|
||||
/// If this stream generates data in grouped by some keys, return true.
|
||||
virtual bool isGroupedOutput() const { return false; }
|
||||
/// If this stream generates data in order by some keys, return true.
|
||||
|
@ -1,20 +1,16 @@
|
||||
#pragma once
|
||||
|
||||
#include <Parsers/ASTInsertQuery.h>
|
||||
#include <Interpreters/Context.h>
|
||||
#include <IO/ConcatReadBuffer.h>
|
||||
#include <Parsers/IAST.h>
|
||||
#include <DataStreams/IProfilingBlockInputStream.h>
|
||||
#include <DataStreams/BlockIO.h>
|
||||
#include <cstddef>
|
||||
#include <memory>
|
||||
|
||||
|
||||
namespace DB
|
||||
{
|
||||
|
||||
namespace ErrorCodes
|
||||
{
|
||||
extern const int LOGICAL_ERROR;
|
||||
}
|
||||
struct BlockIO;
|
||||
class Context;
|
||||
|
||||
/** Prepares an input stream which produce data containing in INSERT query
|
||||
* Head of inserting data could be stored in INSERT ast directly
|
||||
@ -23,7 +19,6 @@ namespace ErrorCodes
|
||||
class InputStreamFromASTInsertQuery : public IProfilingBlockInputStream
|
||||
{
|
||||
public:
|
||||
|
||||
InputStreamFromASTInsertQuery(const ASTPtr & ast, ReadBuffer & input_buffer_tail_part, const BlockIO & streams, Context & context);
|
||||
|
||||
Block readImpl() override { return res_stream->read(); }
|
||||
@ -31,10 +26,10 @@ public:
|
||||
void readSuffixImpl() override { return res_stream->readSuffix(); }
|
||||
|
||||
String getName() const override { return "InputStreamFromASTInsertQuery"; }
|
||||
String getID() const override { return "InputStreamFromASTInsertQuery(" + toString(std::intptr_t(this)) + ")"; }
|
||||
|
||||
Block getHeader() const override { return res_stream->getHeader(); }
|
||||
|
||||
private:
|
||||
|
||||
std::unique_ptr<ReadBuffer> input_buffer_ast_part;
|
||||
std::unique_ptr<ReadBuffer> input_buffer_contacenated;
|
||||
|
||||
|
@ -15,14 +15,13 @@ class LazyBlockInputStream : public IProfilingBlockInputStream
|
||||
public:
|
||||
using Generator = std::function<BlockInputStreamPtr()>;
|
||||
|
||||
LazyBlockInputStream(Generator generator_)
|
||||
: generator(std::move(generator_))
|
||||
LazyBlockInputStream(const Block & header_, Generator generator_)
|
||||
: header(header_), generator(std::move(generator_))
|
||||
{
|
||||
}
|
||||
|
||||
LazyBlockInputStream(const char * name_, Generator generator_)
|
||||
: name(name_)
|
||||
, generator(std::move(generator_))
|
||||
LazyBlockInputStream(const char * name_, const Block & header_, Generator generator_)
|
||||
: name(name_), header(header_), generator(std::move(generator_))
|
||||
{
|
||||
}
|
||||
|
||||
@ -34,6 +33,11 @@ public:
|
||||
IProfilingBlockInputStream::cancel();
|
||||
}
|
||||
|
||||
Block getHeader() const override
|
||||
{
|
||||
return header;
|
||||
}
|
||||
|
||||
protected:
|
||||
Block readImpl() override
|
||||
{
|
||||
@ -89,6 +93,7 @@ protected:
|
||||
|
||||
private:
|
||||
const char * name = "Lazy";
|
||||
Block header;
|
||||
Generator generator;
|
||||
|
||||
BlockInputStreamPtr input;
|
||||
|
@ -21,12 +21,7 @@ public:
|
||||
|
||||
String getName() const override { return "Limit"; }
|
||||
|
||||
String getID() const override
|
||||
{
|
||||
std::stringstream res;
|
||||
res << "Limit(" << children.back()->getID() << ", " << limit << ", " << offset << ")";
|
||||
return res.str();
|
||||
}
|
||||
Block getHeader() const override { return children.at(0)->getHeader(); }
|
||||
|
||||
protected:
|
||||
Block readImpl() override;
|
||||
|
@ -3,9 +3,9 @@
|
||||
#include <DataStreams/IProfilingBlockInputStream.h>
|
||||
|
||||
#include <Common/HashTable/HashMap.h>
|
||||
#include <Common/SipHash.h>
|
||||
#include <Common/UInt128.h>
|
||||
|
||||
|
||||
namespace DB
|
||||
{
|
||||
|
||||
@ -22,6 +22,8 @@ public:
|
||||
|
||||
String getName() const override { return "LimitBy"; }
|
||||
|
||||
Block getHeader() const override { return children.at(0)->getHeader(); }
|
||||
|
||||
protected:
|
||||
Block readImpl() override;
|
||||
|
||||
|
@ -15,11 +15,9 @@ String MaterializingBlockInputStream::getName() const
|
||||
return "Materializing";
|
||||
}
|
||||
|
||||
String MaterializingBlockInputStream::getID() const
|
||||
Block MaterializingBlockInputStream::getHeader() const
|
||||
{
|
||||
std::stringstream res;
|
||||
res << "Materializing(" << children.back()->getID() << ")";
|
||||
return res.str();
|
||||
return materializeBlock(children.back()->getHeader());
|
||||
}
|
||||
|
||||
Block MaterializingBlockInputStream::readImpl()
|
||||
|
@ -12,7 +12,7 @@ class MaterializingBlockInputStream : public IProfilingBlockInputStream
|
||||
public:
|
||||
MaterializingBlockInputStream(const BlockInputStreamPtr & input);
|
||||
String getName() const override;
|
||||
String getID() const override;
|
||||
Block getHeader() const override;
|
||||
|
||||
protected:
|
||||
Block readImpl() override;
|
||||
|
@ -33,12 +33,13 @@ public:
|
||||
size_t max_merged_block_size_, size_t limit_ = 0);
|
||||
|
||||
String getName() const override { return "MergeSortingBlocks"; }
|
||||
String getID() const override { return getName(); }
|
||||
|
||||
bool isGroupedOutput() const override { return true; }
|
||||
bool isSortedOutput() const override { return true; }
|
||||
const SortDescription & getSortDescription() const override { return description; }
|
||||
|
||||
Block getHeader() const override { return children.at(0)->getHeader(); }
|
||||
|
||||
protected:
|
||||
Block readImpl() override;
|
||||
|
||||
@ -80,22 +81,12 @@ public:
|
||||
|
||||
String getName() const override { return "MergeSorting"; }
|
||||
|
||||
String getID() const override
|
||||
{
|
||||
std::stringstream res;
|
||||
res << "MergeSorting(" << children.back()->getID();
|
||||
|
||||
for (size_t i = 0; i < description.size(); ++i)
|
||||
res << ", " << description[i].getID();
|
||||
|
||||
res << ")";
|
||||
return res.str();
|
||||
}
|
||||
|
||||
bool isGroupedOutput() const override { return true; }
|
||||
bool isSortedOutput() const override { return true; }
|
||||
const SortDescription & getSortDescription() const override { return description; }
|
||||
|
||||
Block getHeader() const override { return children.at(0)->getHeader(); }
|
||||
|
||||
protected:
|
||||
Block readImpl() override;
|
||||
|
||||
@ -129,7 +120,7 @@ private:
|
||||
BlockInputStreamPtr block_in;
|
||||
|
||||
TemporaryFileStream(const std::string & path)
|
||||
: file_in(path), compressed_in(file_in), block_in(std::make_shared<NativeBlockInputStream>(compressed_in)) {}
|
||||
: file_in(path), compressed_in(file_in), block_in(std::make_shared<NativeBlockInputStream>(compressed_in, 0)) {}
|
||||
};
|
||||
|
||||
std::vector<std::unique_ptr<TemporaryFileStream>> temporary_inputs;
|
||||
|
@ -6,6 +6,11 @@
|
||||
namespace DB
|
||||
{
|
||||
|
||||
Block MergingAggregatedBlockInputStream::getHeader() const
|
||||
{
|
||||
return aggregator.getHeader(final);
|
||||
}
|
||||
|
||||
|
||||
Block MergingAggregatedBlockInputStream::readImpl()
|
||||
{
|
||||
|
@ -22,12 +22,7 @@ public:
|
||||
|
||||
String getName() const override { return "MergingAggregated"; }
|
||||
|
||||
String getID() const override
|
||||
{
|
||||
std::stringstream res;
|
||||
res << "MergingAggregated(" << children.back()->getID() << ", " << aggregator.getID() << ")";
|
||||
return res.str();
|
||||
}
|
||||
Block getHeader() const override;
|
||||
|
||||
protected:
|
||||
Block readImpl() override;
|
||||
|
@ -90,14 +90,9 @@ MergingAggregatedMemoryEfficientBlockInputStream::MergingAggregatedMemoryEfficie
|
||||
}
|
||||
|
||||
|
||||
String MergingAggregatedMemoryEfficientBlockInputStream::getID() const
|
||||
Block MergingAggregatedMemoryEfficientBlockInputStream::getHeader() const
|
||||
{
|
||||
std::stringstream res;
|
||||
res << "MergingAggregatedMemoryEfficient(" << aggregator.getID();
|
||||
for (size_t i = 0, size = children.size(); i < size; ++i)
|
||||
res << ", " << children.back()->getID();
|
||||
res << ")";
|
||||
return res.str();
|
||||
return aggregator.getHeader(final);
|
||||
}
|
||||
|
||||
|
||||
|
@ -67,8 +67,6 @@ public:
|
||||
|
||||
String getName() const override { return "MergingAggregatedMemoryEfficient"; }
|
||||
|
||||
String getID() const override;
|
||||
|
||||
/// Sends the request (initiates calculations) earlier than `read`.
|
||||
void readPrefix() override;
|
||||
|
||||
@ -80,6 +78,8 @@ public:
|
||||
*/
|
||||
void cancel() override;
|
||||
|
||||
Block getHeader() const override;
|
||||
|
||||
protected:
|
||||
Block readImpl() override;
|
||||
|
||||
|
@ -24,28 +24,6 @@ MergingSortedBlockInputStream::MergingSortedBlockInputStream(
|
||||
children.insert(children.end(), inputs_.begin(), inputs_.end());
|
||||
}
|
||||
|
||||
String MergingSortedBlockInputStream::getID() const
|
||||
{
|
||||
std::stringstream res;
|
||||
res << "MergingSorted(";
|
||||
|
||||
Strings children_ids(children.size());
|
||||
for (size_t i = 0; i < children.size(); ++i)
|
||||
children_ids[i] = children[i]->getID();
|
||||
|
||||
/// The order does not matter.
|
||||
std::sort(children_ids.begin(), children_ids.end());
|
||||
|
||||
for (size_t i = 0; i < children_ids.size(); ++i)
|
||||
res << (i == 0 ? "" : ", ") << children_ids[i];
|
||||
|
||||
for (size_t i = 0; i < description.size(); ++i)
|
||||
res << ", " << description[i].getID();
|
||||
|
||||
res << ")";
|
||||
return res.str();
|
||||
}
|
||||
|
||||
void MergingSortedBlockInputStream::init(Block & header, MutableColumns & merged_columns)
|
||||
{
|
||||
/// Read the first blocks, initialize the queue.
|
||||
|
@ -70,12 +70,12 @@ public:
|
||||
|
||||
String getName() const override { return "MergingSorted"; }
|
||||
|
||||
String getID() const override;
|
||||
|
||||
bool isGroupedOutput() const override { return true; }
|
||||
bool isSortedOutput() const override { return true; }
|
||||
const SortDescription & getSortDescription() const override { return description; }
|
||||
|
||||
Block getHeader() const override { return children.at(0)->getHeader(); }
|
||||
|
||||
protected:
|
||||
struct RowRef
|
||||
{
|
||||
|
@ -19,23 +19,40 @@ namespace ErrorCodes
|
||||
extern const int INCORRECT_INDEX;
|
||||
extern const int LOGICAL_ERROR;
|
||||
extern const int CANNOT_READ_ALL_DATA;
|
||||
extern const int NOT_IMPLEMENTED;
|
||||
}
|
||||
|
||||
NativeBlockInputStream::NativeBlockInputStream(
|
||||
ReadBuffer & istr_, UInt64 server_revision_,
|
||||
bool use_index_,
|
||||
|
||||
NativeBlockInputStream::NativeBlockInputStream(ReadBuffer & istr_, UInt64 server_revision_)
|
||||
: istr(istr_), server_revision(server_revision_)
|
||||
{
|
||||
}
|
||||
|
||||
NativeBlockInputStream::NativeBlockInputStream(ReadBuffer & istr_, const Block & header_, UInt64 server_revision_)
|
||||
: istr(istr_), header(header_), server_revision(server_revision_)
|
||||
{
|
||||
}
|
||||
|
||||
NativeBlockInputStream::NativeBlockInputStream(ReadBuffer & istr_, UInt64 server_revision_,
|
||||
IndexForNativeFormat::Blocks::const_iterator index_block_it_,
|
||||
IndexForNativeFormat::Blocks::const_iterator index_block_end_)
|
||||
: istr(istr_), server_revision(server_revision_),
|
||||
use_index(use_index_), index_block_it(index_block_it_), index_block_end(index_block_end_)
|
||||
use_index(true), index_block_it(index_block_it_), index_block_end(index_block_end_)
|
||||
{
|
||||
if (use_index)
|
||||
{
|
||||
istr_concrete = typeid_cast<CompressedReadBufferFromFile *>(&istr);
|
||||
if (!istr_concrete)
|
||||
throw Exception("When need to use index for NativeBlockInputStream, istr must be CompressedReadBufferFromFile.", ErrorCodes::LOGICAL_ERROR);
|
||||
istr_concrete = typeid_cast<CompressedReadBufferFromFile *>(&istr);
|
||||
if (!istr_concrete)
|
||||
throw Exception("When need to use index for NativeBlockInputStream, istr must be CompressedReadBufferFromFile.", ErrorCodes::LOGICAL_ERROR);
|
||||
|
||||
index_column_it = index_block_it->columns.begin();
|
||||
if (index_block_it == index_block_end)
|
||||
return;
|
||||
|
||||
index_column_it = index_block_it->columns.begin();
|
||||
|
||||
/// Initialize header from the index.
|
||||
for (const auto & column : index_block_it->columns)
|
||||
{
|
||||
auto type = DataTypeFactory::instance().get(column.type);
|
||||
header.insert({ type->createColumn(), type, column.name });
|
||||
}
|
||||
}
|
||||
|
||||
@ -50,6 +67,12 @@ void NativeBlockInputStream::readData(const IDataType & type, IColumn & column,
|
||||
}
|
||||
|
||||
|
||||
Block NativeBlockInputStream::getHeader() const
|
||||
{
|
||||
return header;
|
||||
}
|
||||
|
||||
|
||||
Block NativeBlockInputStream::readImpl()
|
||||
{
|
||||
Block res;
|
||||
|
@ -60,35 +60,33 @@ struct IndexForNativeFormat
|
||||
class NativeBlockInputStream : public IProfilingBlockInputStream
|
||||
{
|
||||
public:
|
||||
/** If a non-zero server_revision is specified, additional block information may be expected and read.
|
||||
*
|
||||
* `index` is not required parameter. If set, only parts of columns specified in the index will be read.
|
||||
*/
|
||||
NativeBlockInputStream(
|
||||
ReadBuffer & istr_, UInt64 server_revision_ = 0,
|
||||
bool use_index_ = false,
|
||||
IndexForNativeFormat::Blocks::const_iterator index_block_it_ = IndexForNativeFormat::Blocks::const_iterator{},
|
||||
IndexForNativeFormat::Blocks::const_iterator index_block_end_ = IndexForNativeFormat::Blocks::const_iterator{});
|
||||
/// If a non-zero server_revision is specified, additional block information may be expected and read.
|
||||
NativeBlockInputStream(ReadBuffer & istr_, UInt64 server_revision_);
|
||||
|
||||
/// For cases when data structure (header) is known in advance.
|
||||
/// NOTE We may use header for data validation and/or type conversions. It is not implemented.
|
||||
NativeBlockInputStream(ReadBuffer & istr_, const Block & header_, UInt64 server_revision_);
|
||||
|
||||
/// For cases when we have an index. It allows to skip columns. Only columns specified in the index will be read.
|
||||
NativeBlockInputStream(ReadBuffer & istr_, UInt64 server_revision_,
|
||||
IndexForNativeFormat::Blocks::const_iterator index_block_it_,
|
||||
IndexForNativeFormat::Blocks::const_iterator index_block_end_);
|
||||
|
||||
String getName() const override { return "Native"; }
|
||||
|
||||
String getID() const override
|
||||
{
|
||||
std::stringstream res;
|
||||
res << this;
|
||||
return res.str();
|
||||
}
|
||||
|
||||
static void readData(const IDataType & type, IColumn & column, ReadBuffer & istr, size_t rows, double avg_value_size_hint);
|
||||
|
||||
Block getHeader() const override;
|
||||
|
||||
protected:
|
||||
Block readImpl() override;
|
||||
|
||||
private:
|
||||
ReadBuffer & istr;
|
||||
Block header;
|
||||
UInt64 server_revision;
|
||||
|
||||
bool use_index;
|
||||
bool use_index = false;
|
||||
IndexForNativeFormat::Blocks::const_iterator index_block_it;
|
||||
IndexForNativeFormat::Blocks::const_iterator index_block_end;
|
||||
IndexOfBlockForNativeFormat::Columns::const_iterator index_column_it;
|
||||
|
@ -28,12 +28,7 @@ public:
|
||||
|
||||
String getName() const override { return "NullAndDoCopy"; }
|
||||
|
||||
String getID() const override
|
||||
{
|
||||
std::stringstream res;
|
||||
res << "copy from " << input->getID();
|
||||
return res.str();
|
||||
}
|
||||
Block getHeader() const override { return {}; }
|
||||
|
||||
protected:
|
||||
Block readImpl() override
|
||||
|
@ -6,14 +6,19 @@
|
||||
namespace DB
|
||||
{
|
||||
|
||||
/** Empty stream of blocks.
|
||||
/** Empty stream of blocks of specified structure.
|
||||
*/
|
||||
class NullBlockInputStream : public IBlockInputStream
|
||||
{
|
||||
public:
|
||||
Block read() override { return Block(); }
|
||||
NullBlockInputStream(const Block & header) : header(header) {}
|
||||
|
||||
Block read() override { return {}; }
|
||||
Block getHeader() const override { return header; }
|
||||
String getName() const override { return "Null"; }
|
||||
|
||||
private:
|
||||
Block header;
|
||||
};
|
||||
|
||||
}
|
||||
|
@ -17,18 +17,12 @@ namespace ErrorCodes
|
||||
NullableAdapterBlockInputStream::NullableAdapterBlockInputStream(
|
||||
const BlockInputStreamPtr & input,
|
||||
const Block & in_sample_, const Block & out_sample_)
|
||||
: header(out_sample_)
|
||||
{
|
||||
buildActions(in_sample_, out_sample_);
|
||||
children.push_back(input);
|
||||
}
|
||||
|
||||
String NullableAdapterBlockInputStream::getID() const
|
||||
{
|
||||
std::stringstream res;
|
||||
res << "NullableAdapterBlockInputStream(" << children.back()->getID() << ")";
|
||||
return res.str();
|
||||
}
|
||||
|
||||
Block NullableAdapterBlockInputStream::readImpl()
|
||||
{
|
||||
Block block = children.back()->read();
|
||||
|
@ -18,12 +18,11 @@ namespace DB
|
||||
class NullableAdapterBlockInputStream : public IProfilingBlockInputStream
|
||||
{
|
||||
public:
|
||||
NullableAdapterBlockInputStream(const BlockInputStreamPtr & input, const Block & in_sample_,
|
||||
const Block & out_sample_);
|
||||
NullableAdapterBlockInputStream(const BlockInputStreamPtr & input, const Block & in_sample_, const Block & out_sample_);
|
||||
|
||||
String getName() const override { return "NullableAdapterBlockInputStream"; }
|
||||
|
||||
String getID() const override;
|
||||
Block getHeader() const override { return header; }
|
||||
|
||||
protected:
|
||||
Block readImpl() override;
|
||||
@ -52,6 +51,7 @@ private:
|
||||
void buildActions(const Block & in_sample, const Block & out_sample);
|
||||
|
||||
private:
|
||||
Block header;
|
||||
Actions actions;
|
||||
std::vector<std::optional<String>> rename;
|
||||
bool must_transform = false;
|
||||
|
@ -16,6 +16,14 @@ public:
|
||||
|
||||
String getName() const override { return "One"; }
|
||||
|
||||
Block getHeader() const override
|
||||
{
|
||||
Block res;
|
||||
for (const auto & elem : block)
|
||||
res.insert({ elem.column->cloneEmpty(), elem.type, elem.name });
|
||||
return res;
|
||||
}
|
||||
|
||||
protected:
|
||||
Block readImpl() override
|
||||
{
|
||||
|
@ -20,13 +20,13 @@ public:
|
||||
children.push_back(stream);
|
||||
}
|
||||
|
||||
Block getHeader() const override { return children.at(0)->getHeader(); }
|
||||
|
||||
private:
|
||||
Block readImpl() override { return stream->read(); }
|
||||
|
||||
String getName() const override { return "Owning"; }
|
||||
|
||||
String getID() const override { return "Owning(" + stream->getID() + ")"; }
|
||||
|
||||
protected:
|
||||
BlockInputStreamPtr stream;
|
||||
std::unique_ptr<OwnType> own;
|
||||
|
@ -29,23 +29,9 @@ ParallelAggregatingBlockInputStream::ParallelAggregatingBlockInputStream(
|
||||
}
|
||||
|
||||
|
||||
String ParallelAggregatingBlockInputStream::getID() const
|
||||
Block ParallelAggregatingBlockInputStream::getHeader() const
|
||||
{
|
||||
std::stringstream res;
|
||||
res << "ParallelAggregating(";
|
||||
|
||||
Strings children_ids(children.size());
|
||||
for (size_t i = 0; i < children.size(); ++i)
|
||||
children_ids[i] = children[i]->getID();
|
||||
|
||||
/// Order does not matter.
|
||||
std::sort(children_ids.begin(), children_ids.end());
|
||||
|
||||
for (size_t i = 0; i < children_ids.size(); ++i)
|
||||
res << (i == 0 ? "" : ", ") << children_ids[i];
|
||||
|
||||
res << ", " << aggregator.getID() << ")";
|
||||
return res.str();
|
||||
return aggregator.getHeader(final);
|
||||
}
|
||||
|
||||
|
||||
@ -122,8 +108,7 @@ void ParallelAggregatingBlockInputStream::Handler::onBlock(Block & block, size_t
|
||||
{
|
||||
parent.aggregator.executeOnBlock(block, *parent.many_data[thread_num],
|
||||
parent.threads_data[thread_num].key_columns, parent.threads_data[thread_num].aggregate_columns,
|
||||
parent.threads_data[thread_num].key_sizes, parent.threads_data[thread_num].key,
|
||||
parent.no_more_keys);
|
||||
parent.threads_data[thread_num].key, parent.no_more_keys);
|
||||
|
||||
parent.threads_data[thread_num].src_rows += block.rows();
|
||||
parent.threads_data[thread_num].src_bytes += block.bytes();
|
||||
|
@ -27,10 +27,10 @@ public:
|
||||
|
||||
String getName() const override { return "ParallelAggregating"; }
|
||||
|
||||
String getID() const override;
|
||||
|
||||
void cancel() override;
|
||||
|
||||
Block getHeader() const override;
|
||||
|
||||
protected:
|
||||
/// Do nothing that preparation to execution of the query be done in parallel, in ParallelInputsProcessor.
|
||||
void readPrefix() override
|
||||
@ -83,14 +83,12 @@ private:
|
||||
StringRefs key;
|
||||
ColumnRawPtrs key_columns;
|
||||
Aggregator::AggregateColumns aggregate_columns;
|
||||
Sizes key_sizes;
|
||||
|
||||
ThreadData(size_t keys_size, size_t aggregates_size)
|
||||
{
|
||||
key.resize(keys_size);
|
||||
key_columns.resize(keys_size);
|
||||
aggregate_columns.resize(aggregates_size);
|
||||
key_sizes.resize(keys_size);
|
||||
}
|
||||
};
|
||||
|
||||
|
@ -23,22 +23,12 @@ public:
|
||||
|
||||
String getName() const override { return "PartialSorting"; }
|
||||
|
||||
String getID() const override
|
||||
{
|
||||
std::stringstream res;
|
||||
res << "PartialSorting(" << children.back()->getID();
|
||||
|
||||
for (size_t i = 0; i < description.size(); ++i)
|
||||
res << ", " << description[i].getID();
|
||||
|
||||
res << ")";
|
||||
return res.str();
|
||||
}
|
||||
|
||||
bool isGroupedOutput() const override { return true; }
|
||||
bool isSortedOutput() const override { return true; }
|
||||
const SortDescription & getSortDescription() const override { return description; }
|
||||
|
||||
Block getHeader() const override { return children.at(0)->getHeader(); }
|
||||
|
||||
protected:
|
||||
Block readImpl() override;
|
||||
|
||||
|
@ -2,6 +2,7 @@
|
||||
#include <DataStreams/OneBlockInputStream.h>
|
||||
#include <Common/NetException.h>
|
||||
#include <Interpreters/Context.h>
|
||||
#include <Interpreters/castColumn.h>
|
||||
#include <Storages/IStorage.h>
|
||||
|
||||
|
||||
@ -17,9 +18,9 @@ namespace ErrorCodes
|
||||
|
||||
RemoteBlockInputStream::RemoteBlockInputStream(
|
||||
Connection & connection,
|
||||
const String & query_, const Context & context_, const Settings * settings,
|
||||
const String & query_, const Block & header_, const Context & context_, const Settings * settings,
|
||||
const ThrottlerPtr & throttler, const Tables & external_tables_, QueryProcessingStage::Enum stage_)
|
||||
: query(query_), context(context_), external_tables(external_tables_), stage(stage_)
|
||||
: header(header_), query(query_), context(context_), external_tables(external_tables_), stage(stage_)
|
||||
{
|
||||
if (settings)
|
||||
context.setSettings(*settings);
|
||||
@ -32,9 +33,9 @@ RemoteBlockInputStream::RemoteBlockInputStream(
|
||||
|
||||
RemoteBlockInputStream::RemoteBlockInputStream(
|
||||
std::vector<IConnectionPool::Entry> && connections,
|
||||
const String & query_, const Context & context_, const Settings * settings,
|
||||
const String & query_, const Block & header_, const Context & context_, const Settings * settings,
|
||||
const ThrottlerPtr & throttler, const Tables & external_tables_, QueryProcessingStage::Enum stage_)
|
||||
: query(query_), context(context_), external_tables(external_tables_), stage(stage_)
|
||||
: header(header_), query(query_), context(context_), external_tables(external_tables_), stage(stage_)
|
||||
{
|
||||
if (settings)
|
||||
context.setSettings(*settings);
|
||||
@ -48,9 +49,9 @@ RemoteBlockInputStream::RemoteBlockInputStream(
|
||||
|
||||
RemoteBlockInputStream::RemoteBlockInputStream(
|
||||
const ConnectionPoolWithFailoverPtr & pool,
|
||||
const String & query_, const Context & context_, const Settings * settings,
|
||||
const String & query_, const Block & header_, const Context & context_, const Settings * settings,
|
||||
const ThrottlerPtr & throttler, const Tables & external_tables_, QueryProcessingStage::Enum stage_)
|
||||
: query(query_), context(context_), external_tables(external_tables_), stage(stage_)
|
||||
: header(header_), query(query_), context(context_), external_tables(external_tables_), stage(stage_)
|
||||
{
|
||||
if (settings)
|
||||
context.setSettings(*settings);
|
||||
@ -148,6 +149,25 @@ void RemoteBlockInputStream::sendExternalTables()
|
||||
multiplexed_connections->sendExternalTablesData(external_tables_data);
|
||||
}
|
||||
|
||||
|
||||
/** If we receive a block with slightly different column types, or with excessive columns,
|
||||
* we will adapt it to expected structure.
|
||||
*/
|
||||
static Block adaptBlockStructure(const Block & block, const Block & header, const Context & context)
|
||||
{
|
||||
/// Special case when reader doesn't care about result structure. Deprecated and used only in Benchmark, PerformanceTest.
|
||||
if (!header)
|
||||
return block;
|
||||
|
||||
Block res;
|
||||
res.info = block.info;
|
||||
|
||||
for (const auto & elem : header)
|
||||
res.insert({ castColumn(block.getByName(elem.name), elem.type, context), elem.type, elem.name });
|
||||
return res;
|
||||
}
|
||||
|
||||
|
||||
Block RemoteBlockInputStream::readImpl()
|
||||
{
|
||||
if (!sent_query)
|
||||
@ -170,7 +190,7 @@ Block RemoteBlockInputStream::readImpl()
|
||||
case Protocol::Server::Data:
|
||||
/// If the block is not empty and is not a header block
|
||||
if (packet.block && (packet.block.rows() > 0))
|
||||
return packet.block;
|
||||
return adaptBlockStructure(packet.block, header, context);
|
||||
break; /// If the block is empty - we will receive other packets before EndOfStream.
|
||||
|
||||
case Protocol::Server::Exception:
|
||||
|
@ -24,7 +24,7 @@ public:
|
||||
/// If `settings` is nullptr, settings will be taken from context.
|
||||
RemoteBlockInputStream(
|
||||
Connection & connection,
|
||||
const String & query_, const Context & context_, const Settings * settings = nullptr,
|
||||
const String & query_, const Block & header_, const Context & context_, const Settings * settings = nullptr,
|
||||
const ThrottlerPtr & throttler = nullptr, const Tables & external_tables_ = Tables(),
|
||||
QueryProcessingStage::Enum stage_ = QueryProcessingStage::Complete);
|
||||
|
||||
@ -32,7 +32,7 @@ public:
|
||||
/// If `settings` is nullptr, settings will be taken from context.
|
||||
RemoteBlockInputStream(
|
||||
std::vector<IConnectionPool::Entry> && connections,
|
||||
const String & query_, const Context & context_, const Settings * settings = nullptr,
|
||||
const String & query_, const Block & header_, const Context & context_, const Settings * settings = nullptr,
|
||||
const ThrottlerPtr & throttler = nullptr, const Tables & external_tables_ = Tables(),
|
||||
QueryProcessingStage::Enum stage_ = QueryProcessingStage::Complete);
|
||||
|
||||
@ -40,7 +40,7 @@ public:
|
||||
/// If `settings` is nullptr, settings will be taken from context.
|
||||
RemoteBlockInputStream(
|
||||
const ConnectionPoolWithFailoverPtr & pool,
|
||||
const String & query_, const Context & context_, const Settings * settings = nullptr,
|
||||
const String & query_, const Block & header_, const Context & context_, const Settings * settings = nullptr,
|
||||
const ThrottlerPtr & throttler = nullptr, const Tables & external_tables_ = Tables(),
|
||||
QueryProcessingStage::Enum stage_ = QueryProcessingStage::Complete);
|
||||
|
||||
@ -66,18 +66,13 @@ public:
|
||||
|
||||
String getName() const override { return "Remote"; }
|
||||
|
||||
String getID() const override
|
||||
{
|
||||
std::stringstream res;
|
||||
res << this;
|
||||
return res.str();
|
||||
}
|
||||
|
||||
BlockExtraInfo getBlockExtraInfo() const override
|
||||
{
|
||||
return multiplexed_connections->getBlockExtraInfo();
|
||||
}
|
||||
|
||||
Block getHeader() const override { return header; }
|
||||
|
||||
protected:
|
||||
/// Send all temporary tables to remote servers
|
||||
void sendExternalTables();
|
||||
@ -95,10 +90,14 @@ protected:
|
||||
private:
|
||||
void sendQuery();
|
||||
|
||||
Block receiveBlock();
|
||||
|
||||
/// If wasn't sent yet, send request to cancell all connections to replicas
|
||||
void tryCancel(const char * reason);
|
||||
|
||||
private:
|
||||
Block header;
|
||||
|
||||
std::function<std::unique_ptr<MultiplexedConnections>()> create_multiplexed_connections;
|
||||
|
||||
std::unique_ptr<MultiplexedConnections> multiplexed_connections;
|
||||
|
@ -22,16 +22,15 @@ public:
|
||||
|
||||
String getName() const override { return "RemoveColumns"; }
|
||||
|
||||
String getID() const override
|
||||
Block getHeader() const override
|
||||
{
|
||||
std::stringstream res;
|
||||
res << "RemoveColumns(" << children.back()->getID();
|
||||
Block res = children.back()->getHeader();
|
||||
|
||||
for (const auto & it : columns_to_remove)
|
||||
res << ", " << it;
|
||||
if (res.has(it))
|
||||
res.erase(it);
|
||||
|
||||
res << ")";
|
||||
return res.str();
|
||||
return res;
|
||||
}
|
||||
|
||||
protected:
|
||||
|
@ -24,23 +24,6 @@ public:
|
||||
|
||||
String getName() const override { return "ReplacingSorted"; }
|
||||
|
||||
String getID() const override
|
||||
{
|
||||
std::stringstream res;
|
||||
res << "ReplacingSorted(inputs";
|
||||
|
||||
for (size_t i = 0; i < children.size(); ++i)
|
||||
res << ", " << children[i]->getID();
|
||||
|
||||
res << ", description";
|
||||
|
||||
for (size_t i = 0; i < description.size(); ++i)
|
||||
res << ", " << description[i].getID();
|
||||
|
||||
res << ", version_column, " << version_column << ")";
|
||||
return res.str();
|
||||
}
|
||||
|
||||
protected:
|
||||
/// Can return 1 more records than max_block_size.
|
||||
Block readImpl() override;
|
||||
|
@ -16,12 +16,7 @@ public:
|
||||
|
||||
String getName() const override { return "Squashing"; }
|
||||
|
||||
String getID() const override
|
||||
{
|
||||
std::stringstream res;
|
||||
res << "Squashing(" << children.at(0)->getID() << ")";
|
||||
return res.str();
|
||||
}
|
||||
Block getHeader() const override { return children.at(0)->getHeader(); }
|
||||
|
||||
protected:
|
||||
Block readImpl() override;
|
||||
|
@ -23,24 +23,6 @@ namespace ErrorCodes
|
||||
}
|
||||
|
||||
|
||||
String SummingSortedBlockInputStream::getID() const
|
||||
{
|
||||
std::stringstream res;
|
||||
res << "SummingSorted(inputs";
|
||||
|
||||
for (size_t i = 0; i < children.size(); ++i)
|
||||
res << ", " << children[i]->getID();
|
||||
|
||||
res << ", description";
|
||||
|
||||
for (size_t i = 0; i < description.size(); ++i)
|
||||
res << ", " << description[i].getID();
|
||||
|
||||
res << ")";
|
||||
return res.str();
|
||||
}
|
||||
|
||||
|
||||
void SummingSortedBlockInputStream::insertCurrentRowIfNeeded(MutableColumns & merged_columns, bool force_insertion)
|
||||
{
|
||||
for (auto & desc : columns_to_aggregate)
|
||||
|
@ -35,8 +35,6 @@ public:
|
||||
|
||||
String getName() const override { return "SummingSorted"; }
|
||||
|
||||
String getID() const override;
|
||||
|
||||
protected:
|
||||
/// Can return 1 more records than max_block_size.
|
||||
Block readImpl() override;
|
||||
|
@ -1,6 +1,7 @@
|
||||
#include <DataStreams/TotalsHavingBlockInputStream.h>
|
||||
#include <Interpreters/ExpressionActions.h>
|
||||
#include <Interpreters/AggregateDescription.h>
|
||||
#include <DataTypes/DataTypeAggregateFunction.h>
|
||||
#include <Columns/ColumnAggregateFunction.h>
|
||||
#include <Columns/ColumnsNumber.h>
|
||||
#include <Columns/FilterDescription.h>
|
||||
@ -29,26 +30,18 @@ TotalsHavingBlockInputStream::TotalsHavingBlockInputStream(
|
||||
}
|
||||
|
||||
|
||||
String TotalsHavingBlockInputStream::getID() const
|
||||
{
|
||||
std::stringstream res;
|
||||
res << "TotalsHavingBlockInputStream(" << children.back()->getID()
|
||||
<< "," << filter_column_name << ")";
|
||||
return res.str();
|
||||
}
|
||||
|
||||
|
||||
static void finalize(Block & block)
|
||||
{
|
||||
for (size_t i = 0; i < block.columns(); ++i)
|
||||
{
|
||||
ColumnWithTypeAndName & current = block.safeGetByPosition(i);
|
||||
const ColumnAggregateFunction * unfinalized_column = typeid_cast<const ColumnAggregateFunction *>(current.column.get());
|
||||
const DataTypeAggregateFunction * unfinalized_type = typeid_cast<const DataTypeAggregateFunction *>(current.type.get());
|
||||
|
||||
if (unfinalized_column)
|
||||
if (unfinalized_type)
|
||||
{
|
||||
current.type = unfinalized_column->getAggregateFunction()->getReturnType();
|
||||
current.column = unfinalized_column->convertToValues();
|
||||
current.type = unfinalized_type->getReturnType();
|
||||
if (current.column)
|
||||
current.column = typeid_cast<const ColumnAggregateFunction &>(*current.column).convertToValues();
|
||||
}
|
||||
}
|
||||
}
|
||||
@ -70,7 +63,7 @@ const Block & TotalsHavingBlockInputStream::getTotals()
|
||||
addToTotals(overflow_aggregates, nullptr);
|
||||
}
|
||||
|
||||
totals = header.cloneWithColumns(std::move(current_totals));
|
||||
totals = children.at(0)->getHeader().cloneWithColumns(std::move(current_totals));
|
||||
finalize(totals);
|
||||
}
|
||||
|
||||
@ -81,6 +74,16 @@ const Block & TotalsHavingBlockInputStream::getTotals()
|
||||
}
|
||||
|
||||
|
||||
Block TotalsHavingBlockInputStream::getHeader() const
|
||||
{
|
||||
Block res = children.at(0)->getHeader();
|
||||
finalize(res);
|
||||
if (expression)
|
||||
expression->execute(res);
|
||||
return res;
|
||||
}
|
||||
|
||||
|
||||
Block TotalsHavingBlockInputStream::readImpl()
|
||||
{
|
||||
Block finalized;
|
||||
@ -90,9 +93,6 @@ Block TotalsHavingBlockInputStream::readImpl()
|
||||
{
|
||||
block = children[0]->read();
|
||||
|
||||
if (!header)
|
||||
header = block.cloneEmpty();
|
||||
|
||||
/// Block with values not included in `max_rows_to_group_by`. We'll postpone it.
|
||||
if (overflow_row && block && block.info.is_overflows)
|
||||
{
|
||||
|
@ -19,6 +19,7 @@ private:
|
||||
using ExpressionActionsPtr = std::shared_ptr<ExpressionActions>;
|
||||
|
||||
public:
|
||||
/// expression may be nullptr
|
||||
TotalsHavingBlockInputStream(
|
||||
const BlockInputStreamPtr & input_,
|
||||
bool overflow_row_, const ExpressionActionsPtr & expression_,
|
||||
@ -26,10 +27,10 @@ public:
|
||||
|
||||
String getName() const override { return "TotalsHaving"; }
|
||||
|
||||
String getID() const override;
|
||||
|
||||
const Block & getTotals() override;
|
||||
|
||||
Block getHeader() const override;
|
||||
|
||||
protected:
|
||||
Block readImpl() override;
|
||||
|
||||
@ -42,8 +43,6 @@ private:
|
||||
size_t passed_keys = 0;
|
||||
size_t total_keys = 0;
|
||||
|
||||
Block header;
|
||||
|
||||
/** Here are the values that did not pass max_rows_to_group_by.
|
||||
* They are added or not added to the current_totals, depending on the totals_mode.
|
||||
*/
|
||||
|
@ -86,26 +86,6 @@ public:
|
||||
|
||||
String getName() const override { return "Union"; }
|
||||
|
||||
String getID() const override
|
||||
{
|
||||
std::stringstream res;
|
||||
res << "Union(";
|
||||
|
||||
Strings children_ids(children.size());
|
||||
for (size_t i = 0; i < children.size(); ++i)
|
||||
children_ids[i] = children[i]->getID();
|
||||
|
||||
/// Order does not matter.
|
||||
std::sort(children_ids.begin(), children_ids.end());
|
||||
|
||||
for (size_t i = 0; i < children_ids.size(); ++i)
|
||||
res << (i == 0 ? "" : ", ") << children_ids[i];
|
||||
|
||||
res << ")";
|
||||
return res.str();
|
||||
}
|
||||
|
||||
|
||||
~UnionBlockInputStream() override
|
||||
{
|
||||
try
|
||||
@ -139,6 +119,8 @@ public:
|
||||
return doGetBlockExtraInfo();
|
||||
}
|
||||
|
||||
Block getHeader() const override { return children.at(0)->getHeader(); }
|
||||
|
||||
protected:
|
||||
void finalize()
|
||||
{
|
||||
|
@ -185,24 +185,6 @@ public:
|
||||
|
||||
String getName() const override { return "VersionedCollapsingSorted"; }
|
||||
|
||||
String getID() const override
|
||||
{
|
||||
std::stringstream res;
|
||||
res << "VersionedCollapsingSortedBlockInputStream(inputs";
|
||||
|
||||
for (const auto & child : children)
|
||||
res << ", " << child->getID();
|
||||
|
||||
res << ", description";
|
||||
|
||||
for (const auto & descr : description)
|
||||
res << ", " << descr.getID();
|
||||
|
||||
res << ", sign_column, " << sign_column;
|
||||
res << ", version_column, " << sign_column << ")";
|
||||
return res.str();
|
||||
}
|
||||
|
||||
protected:
|
||||
/// Can return 1 more records than max_block_size.
|
||||
Block readImpl() override;
|
||||
|
@ -73,7 +73,7 @@ BlockInputStreamPtr ClickHouseDictionarySource::loadAll()
|
||||
*/
|
||||
if (is_local)
|
||||
return executeQuery(load_all_query, context, true).in;
|
||||
return std::make_shared<RemoteBlockInputStream>(pool, load_all_query, context);
|
||||
return std::make_shared<RemoteBlockInputStream>(pool, load_all_query, sample_block, context);
|
||||
}
|
||||
|
||||
|
||||
@ -103,7 +103,7 @@ BlockInputStreamPtr ClickHouseDictionarySource::createStreamForSelectiveLoad(con
|
||||
{
|
||||
if (is_local)
|
||||
return executeQuery(query, context, true).in;
|
||||
return std::make_shared<RemoteBlockInputStream>(pool, query, context);
|
||||
return std::make_shared<RemoteBlockInputStream>(pool, query, sample_block, context);
|
||||
}
|
||||
|
||||
}
|
||||
|
@ -8,13 +8,6 @@ DictionaryBlockInputStreamBase::DictionaryBlockInputStreamBase(size_t rows_count
|
||||
{
|
||||
}
|
||||
|
||||
String DictionaryBlockInputStreamBase::getID() const
|
||||
{
|
||||
std::stringstream ss;
|
||||
ss << static_cast<const void*>(this);
|
||||
return ss.str();
|
||||
}
|
||||
|
||||
Block DictionaryBlockInputStreamBase::readImpl()
|
||||
{
|
||||
if (next_row == rows_count)
|
||||
@ -26,4 +19,9 @@ Block DictionaryBlockInputStreamBase::readImpl()
|
||||
return block;
|
||||
}
|
||||
|
||||
Block DictionaryBlockInputStreamBase::getHeader() const
|
||||
{
|
||||
return getBlock(0, 0);
|
||||
}
|
||||
|
||||
}
|
||||
|
@ -11,17 +11,16 @@ protected:
|
||||
|
||||
DictionaryBlockInputStreamBase(size_t rows_count, size_t max_block_size);
|
||||
|
||||
String getID() const override;
|
||||
|
||||
virtual Block getBlock(size_t start, size_t length) const = 0;
|
||||
|
||||
Block getHeader() const override;
|
||||
|
||||
private:
|
||||
const size_t rows_count;
|
||||
const size_t max_block_size;
|
||||
size_t next_row;
|
||||
size_t next_row = 0;
|
||||
|
||||
Block readImpl() override;
|
||||
void readPrefixImpl() override { next_row = 0; }
|
||||
};
|
||||
|
||||
}
|
||||
|
@ -101,6 +101,8 @@ public:
|
||||
}
|
||||
}
|
||||
|
||||
Block getHeader() const override { return stream->getHeader(); };
|
||||
|
||||
private:
|
||||
Block readImpl() override { return stream->read(); }
|
||||
|
||||
@ -118,7 +120,6 @@ private:
|
||||
}
|
||||
|
||||
String getName() const override { return "WithBackgroundThread"; }
|
||||
String getID() const override { return "WithBackgroundThread(" + stream->getID() + ")"; }
|
||||
|
||||
BlockInputStreamPtr stream;
|
||||
std::unique_ptr<ShellCommand> command;
|
||||
|
@ -38,14 +38,6 @@ MongoDBBlockInputStream::MongoDBBlockInputStream(
|
||||
MongoDBBlockInputStream::~MongoDBBlockInputStream() = default;
|
||||
|
||||
|
||||
String MongoDBBlockInputStream::getID() const
|
||||
{
|
||||
std::ostringstream stream;
|
||||
stream << cursor.get();
|
||||
return "MongoDB(@" + stream.str() + ")";
|
||||
}
|
||||
|
||||
|
||||
namespace
|
||||
{
|
||||
using ValueType = ExternalResultDescription::ValueType;
|
||||
|
@ -32,7 +32,7 @@ public:
|
||||
|
||||
String getName() const override { return "MongoDB"; }
|
||||
|
||||
String getID() const override;
|
||||
Block getHeader() const override { return description.sample_block; };
|
||||
|
||||
private:
|
||||
Block readImpl() override;
|
||||
|
@ -33,12 +33,6 @@ MySQLBlockInputStream::MySQLBlockInputStream(
|
||||
}
|
||||
|
||||
|
||||
String MySQLBlockInputStream::getID() const
|
||||
{
|
||||
return "MySQL(" + query.str() + ")";
|
||||
}
|
||||
|
||||
|
||||
namespace
|
||||
{
|
||||
using ValueType = ExternalResultDescription::ValueType;
|
||||
|
@ -21,7 +21,7 @@ public:
|
||||
|
||||
String getName() const override { return "MySQL"; }
|
||||
|
||||
String getID() const override;
|
||||
Block getHeader() const override { return description.sample_block; };
|
||||
|
||||
private:
|
||||
Block readImpl() override;
|
||||
|
@ -38,12 +38,6 @@ ODBCBlockInputStream::ODBCBlockInputStream(
|
||||
}
|
||||
|
||||
|
||||
String ODBCBlockInputStream::getID() const
|
||||
{
|
||||
return "ODBC(" + statement.toString() + ")";
|
||||
}
|
||||
|
||||
|
||||
namespace
|
||||
{
|
||||
using ValueType = ExternalResultDescription::ValueType;
|
||||
|
@ -27,7 +27,7 @@ public:
|
||||
|
||||
String getName() const override { return "ODBC"; }
|
||||
|
||||
String getID() const override;
|
||||
Block getHeader() const override { return description.sample_block; };
|
||||
|
||||
private:
|
||||
Block readImpl() override;
|
||||
|
@ -23,6 +23,7 @@
|
||||
#include <DataTypes/getMostSubtype.h>
|
||||
#include <Core/TypeListNumber.h>
|
||||
|
||||
|
||||
namespace DB
|
||||
{
|
||||
|
||||
@ -156,7 +157,7 @@ public:
|
||||
++index;
|
||||
}
|
||||
|
||||
ColumnPtr getNullMapData() && { return std::move(sink_null_map_holder); }
|
||||
ColumnPtr getNullMapColumnPtr() && { return std::move(sink_null_map_holder); }
|
||||
|
||||
private:
|
||||
const UInt8 * src_null_map = nullptr;
|
||||
@ -776,25 +777,25 @@ DataTypePtr FunctionArrayElement::getReturnTypeImpl(const DataTypes & arguments)
|
||||
void FunctionArrayElement::executeImpl(Block & block, const ColumnNumbers & arguments, size_t result)
|
||||
{
|
||||
/// Check nullability.
|
||||
bool is_nullable_array = false;
|
||||
bool is_array_of_nullable = false;
|
||||
|
||||
const ColumnArray * col_array = nullptr;
|
||||
const ColumnArray * col_const_array = nullptr;
|
||||
|
||||
col_array = checkAndGetColumn<ColumnArray>(block.getByPosition(arguments[0]).column.get());
|
||||
if (col_array)
|
||||
is_nullable_array = col_array->getData().isColumnNullable();
|
||||
is_array_of_nullable = col_array->getData().isColumnNullable();
|
||||
else
|
||||
{
|
||||
col_const_array = checkAndGetColumnConstData<ColumnArray>(block.getByPosition(arguments[0]).column.get());
|
||||
if (col_const_array)
|
||||
is_nullable_array = col_const_array->getData().isColumnNullable();
|
||||
is_array_of_nullable = col_const_array->getData().isColumnNullable();
|
||||
else
|
||||
throw Exception("Illegal column " + block.getByPosition(arguments[0]).column->getName()
|
||||
+ " of first argument of function " + getName(), ErrorCodes::ILLEGAL_COLUMN);
|
||||
}
|
||||
|
||||
if (!is_nullable_array)
|
||||
if (!is_array_of_nullable)
|
||||
{
|
||||
ArrayImpl::NullMapBuilder builder;
|
||||
perform(block, arguments, result, builder);
|
||||
@ -860,7 +861,7 @@ void FunctionArrayElement::executeImpl(Block & block, const ColumnNumbers & argu
|
||||
/// Store the result.
|
||||
const ColumnWithTypeAndName & source_col = source_block.getByPosition(2);
|
||||
ColumnWithTypeAndName & dest_col = block.getByPosition(result);
|
||||
dest_col.column = ColumnNullable::create(source_col.column, std::move(builder).getNullMapData());
|
||||
dest_col.column = ColumnNullable::create(source_col.column, builder ? std::move(builder).getNullMapColumnPtr() : ColumnUInt8::create());
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -53,6 +53,13 @@ namespace ErrorCodes
|
||||
extern const int CANNOT_PARSE_UUID;
|
||||
extern const int TOO_LARGE_STRING_SIZE;
|
||||
extern const int TOO_LESS_ARGUMENTS_FOR_FUNCTION;
|
||||
extern const int LOGICAL_ERROR;
|
||||
extern const int TYPE_MISMATCH;
|
||||
extern const int CANNOT_CONVERT_TYPE;
|
||||
extern const int ILLEGAL_COLUMN;
|
||||
extern const int NUMBER_OF_ARGUMENTS_DOESNT_MATCH;
|
||||
extern const int ILLEGAL_TYPE_OF_ARGUMENT;
|
||||
extern const int NOT_IMPLEMENTED;
|
||||
}
|
||||
|
||||
|
||||
@ -508,9 +515,6 @@ struct ConvertImplGenericFromString
|
||||
{
|
||||
auto res = data_type_to.createColumn();
|
||||
|
||||
if (!size)
|
||||
return;
|
||||
|
||||
IColumn & column_to = *res;
|
||||
column_to.reserve(size);
|
||||
|
||||
@ -1272,9 +1276,7 @@ private:
|
||||
static WrapperType createFixedStringWrapper(const DataTypePtr & from_type, const size_t N)
|
||||
{
|
||||
if (!from_type->isStringOrFixedString())
|
||||
throw Exception{
|
||||
"CAST AS FixedString is only implemented for types String and FixedString",
|
||||
ErrorCodes::NOT_IMPLEMENTED};
|
||||
throw Exception{"CAST AS FixedString is only implemented for types String and FixedString", ErrorCodes::NOT_IMPLEMENTED};
|
||||
|
||||
return [N] (Block & block, const ColumnNumbers & arguments, const size_t result)
|
||||
{
|
||||
@ -1309,9 +1311,7 @@ private:
|
||||
|
||||
/// both from_type and to_type should be nullptr now is array types had same dimensions
|
||||
if ((from_type == nullptr) != (to_type == nullptr))
|
||||
throw Exception{
|
||||
"CAST AS Array can only be performed between same-dimensional array types or from String",
|
||||
ErrorCodes::TYPE_MISMATCH};
|
||||
throw Exception{"CAST AS Array can only be performed between same-dimensional array types or from String", ErrorCodes::TYPE_MISMATCH};
|
||||
|
||||
/// Prepare nested type conversion
|
||||
const auto nested_function = prepare(from_nested_type, to_nested_type.get());
|
||||
@ -1337,9 +1337,7 @@ private:
|
||||
block.getByPosition(result).column = ColumnArray::create(nested_block.getByPosition(1).column, col_array->getOffsetsPtr());
|
||||
}
|
||||
else
|
||||
throw Exception{
|
||||
"Illegal column " + array_arg.column->getName() + " for function CAST AS Array",
|
||||
ErrorCodes::LOGICAL_ERROR};
|
||||
throw Exception{"Illegal column " + array_arg.column->getName() + " for function CAST AS Array", ErrorCodes::LOGICAL_ERROR};
|
||||
};
|
||||
}
|
||||
|
||||
@ -1356,16 +1354,12 @@ private:
|
||||
|
||||
const auto from_type = checkAndGetDataType<DataTypeTuple>(from_type_untyped.get());
|
||||
if (!from_type)
|
||||
throw Exception{
|
||||
"CAST AS Tuple can only be performed between tuple types or from String.\nLeft type: " + from_type_untyped->getName() +
|
||||
", right type: " + to_type->getName(),
|
||||
ErrorCodes::TYPE_MISMATCH};
|
||||
throw Exception{"CAST AS Tuple can only be performed between tuple types or from String.\nLeft type: " + from_type_untyped->getName() +
|
||||
", right type: " + to_type->getName(), ErrorCodes::TYPE_MISMATCH};
|
||||
|
||||
if (from_type->getElements().size() != to_type->getElements().size())
|
||||
throw Exception{
|
||||
"CAST AS Tuple can only be performed between tuple types with the same number of elements or from String.\n"
|
||||
"Left type: " + from_type->getName() + ", right type: " + to_type->getName(),
|
||||
ErrorCodes::TYPE_MISMATCH};
|
||||
throw Exception{"CAST AS Tuple can only be performed between tuple types with the same number of elements or from String.\n"
|
||||
"Left type: " + from_type->getName() + ", right type: " + to_type->getName(), ErrorCodes::TYPE_MISMATCH};
|
||||
|
||||
const auto & from_element_types = from_type->getElements();
|
||||
const auto & to_element_types = to_type->getElements();
|
||||
@ -1441,10 +1435,8 @@ private:
|
||||
};
|
||||
}
|
||||
else
|
||||
throw Exception{
|
||||
"Conversion from " + from_type->getName() + " to " + to_type->getName() +
|
||||
" is not supported",
|
||||
ErrorCodes::CANNOT_CONVERT_TYPE};
|
||||
throw Exception{"Conversion from " + from_type->getName() + " to " + to_type->getName() +
|
||||
" is not supported", ErrorCodes::CANNOT_CONVERT_TYPE};
|
||||
}
|
||||
|
||||
template <typename EnumTypeFrom, typename EnumTypeTo>
|
||||
@ -1467,10 +1459,8 @@ private:
|
||||
const auto & old_value = name_value.second;
|
||||
const auto & new_value = to_type->getValue(name_value.first);
|
||||
if (old_value != new_value)
|
||||
throw Exception{
|
||||
"Enum conversion changes value for element '" + name_value.first +
|
||||
"' from " + toString(old_value) + " to " + toString(new_value),
|
||||
ErrorCodes::CANNOT_CONVERT_TYPE};
|
||||
throw Exception{"Enum conversion changes value for element '" + name_value.first +
|
||||
"' from " + toString(old_value) + " to " + toString(new_value), ErrorCodes::CANNOT_CONVERT_TYPE};
|
||||
}
|
||||
};
|
||||
|
||||
@ -1499,8 +1489,7 @@ private:
|
||||
col_with_type_and_name.column = std::move(res);
|
||||
}
|
||||
else
|
||||
throw Exception{
|
||||
"Unexpected column " + first_col->getName() + " as first argument of function " + function_name,
|
||||
throw Exception{"Unexpected column " + first_col->getName() + " as first argument of function " + function_name,
|
||||
ErrorCodes::LOGICAL_ERROR};
|
||||
};
|
||||
}
|
||||
@ -1540,8 +1529,7 @@ private:
|
||||
|
||||
/// Check that the requested conversion is allowed.
|
||||
if (nullable_conversion.source_is_nullable && !nullable_conversion.result_is_nullable)
|
||||
throw Exception{"Cannot convert data from a nullable type to a non-nullable type",
|
||||
ErrorCodes::CANNOT_CONVERT_TYPE};
|
||||
throw Exception{"Cannot convert data from a nullable type to a non-nullable type", ErrorCodes::CANNOT_CONVERT_TYPE};
|
||||
|
||||
if (from_type->onlyNull())
|
||||
{
|
||||
@ -1671,9 +1659,7 @@ private:
|
||||
/// It's possible to use ConvertImplGenericFromString to convert from String to AggregateFunction,
|
||||
/// but it is disabled because deserializing aggregate functions state might be unsafe.
|
||||
|
||||
throw Exception{
|
||||
"Conversion from " + from_type->getName() + " to " + to_type->getName() + " is not supported",
|
||||
ErrorCodes::CANNOT_CONVERT_TYPE};
|
||||
throw Exception{"Conversion from " + from_type->getName() + " to " + to_type->getName() + " is not supported", ErrorCodes::CANNOT_CONVERT_TYPE};
|
||||
}
|
||||
};
|
||||
|
||||
@ -1709,8 +1695,7 @@ protected:
|
||||
{
|
||||
const auto type_col = checkAndGetColumnConst<ColumnString>(arguments.back().column.get());
|
||||
if (!type_col)
|
||||
throw Exception("Second argument to " + getName() + " must be a constant string describing type",
|
||||
ErrorCodes::ILLEGAL_TYPE_OF_ARGUMENT);
|
||||
throw Exception("Second argument to " + getName() + " must be a constant string describing type", ErrorCodes::ILLEGAL_TYPE_OF_ARGUMENT);
|
||||
|
||||
return DataTypeFactory::instance().get(type_col->getValue<String>());
|
||||
}
|
||||
|
@ -17,7 +17,6 @@ void registerFunctionsHigherOrder(FunctionFactory & factory)
|
||||
factory.registerFunction<FunctionArraySort>();
|
||||
factory.registerFunction<FunctionArrayReverseSort>();
|
||||
factory.registerFunction<FunctionArrayCumSum>();
|
||||
|
||||
}
|
||||
|
||||
}
|
||||
|
@ -599,30 +599,17 @@ public:
|
||||
void executeImpl(Block & block, const ColumnNumbers & arguments, size_t result) override
|
||||
{
|
||||
const IColumn * col = block.getByPosition(arguments[0]).column.get();
|
||||
double seconds;
|
||||
size_t size = col->size();
|
||||
|
||||
if (auto column = checkAndGetColumnConst<ColumnVector<Float64>>(col))
|
||||
seconds = column->getValue<Float64>();
|
||||
|
||||
else if (auto column = checkAndGetColumnConst<ColumnVector<Float32>>(col))
|
||||
seconds = static_cast<double>(column->getValue<Float64>());
|
||||
|
||||
else if (auto column = checkAndGetColumnConst<ColumnVector<UInt64>>(col))
|
||||
seconds = static_cast<double>(column->getValue<UInt64>());
|
||||
|
||||
else if (auto column = checkAndGetColumnConst<ColumnVector<UInt32>>(col))
|
||||
seconds = static_cast<double>(column->getValue<UInt32>());
|
||||
|
||||
else if (auto column = checkAndGetColumnConst<ColumnVector<UInt16>>(col))
|
||||
seconds = static_cast<double>(column->getValue<UInt16>());
|
||||
|
||||
else if (auto column = checkAndGetColumnConst<ColumnVector<UInt8>>(col))
|
||||
seconds = static_cast<double>(column->getValue<UInt8>());
|
||||
|
||||
else
|
||||
if (!col->isColumnConst())
|
||||
throw Exception("The argument of function " + getName() + " must be constant.", ErrorCodes::ILLEGAL_COLUMN);
|
||||
|
||||
Float64 seconds = applyVisitor(FieldVisitorConvertToNumber<Float64>(), static_cast<const ColumnConst &>(*col).getField());
|
||||
|
||||
if (seconds < 0)
|
||||
throw Exception("Cannot sleep negative amount of time (not implemented)", ErrorCodes::BAD_ARGUMENTS);
|
||||
|
||||
size_t size = col->size();
|
||||
|
||||
/// We do not sleep if the block is empty.
|
||||
if (size > 0)
|
||||
{
|
||||
|
@ -65,6 +65,8 @@ public:
|
||||
|
||||
bool isVariadic() const override { return true; }
|
||||
size_t getNumberOfArguments() const override { return 0; }
|
||||
bool useDefaultImplementationForConstants() const override { return true; }
|
||||
ColumnNumbers getArgumentsThatAreAlwaysConstant() const override { return {1, 2}; }
|
||||
|
||||
DataTypePtr getReturnTypeImpl(const DataTypes & arguments) const override
|
||||
{
|
||||
@ -189,13 +191,13 @@ public:
|
||||
private:
|
||||
void executeConst(Block & block, const ColumnNumbers & arguments, const size_t result)
|
||||
{
|
||||
/// Construct a block of full-size columns of size 1 and compute the function as usual.
|
||||
/// Materialize the input column and compute the function as usual.
|
||||
|
||||
Block tmp_block;
|
||||
ColumnNumbers tmp_arguments;
|
||||
|
||||
tmp_block.insert(block.getByPosition(arguments[0]));
|
||||
tmp_block.getByPosition(0).column = static_cast<const ColumnConst *>(tmp_block.getByPosition(0).column.get())->getDataColumnPtr();
|
||||
tmp_block.getByPosition(0).column = tmp_block.getByPosition(0).column->cloneResized(block.rows())->convertToFullColumnIfConst();
|
||||
tmp_arguments.push_back(0);
|
||||
|
||||
for (size_t i = 1; i < arguments.size(); ++i)
|
||||
@ -209,9 +211,7 @@ private:
|
||||
|
||||
execute(tmp_block, tmp_arguments, tmp_result);
|
||||
|
||||
block.getByPosition(result).column = block.getByPosition(result).type->createColumnConst(
|
||||
block.rows(),
|
||||
(*tmp_block.getByPosition(tmp_result).column)[0]);
|
||||
block.getByPosition(result).column = tmp_block.getByPosition(tmp_result).column;
|
||||
}
|
||||
|
||||
template <typename T>
|
||||
@ -727,7 +727,7 @@ private:
|
||||
/// Different versions of the hash tables to implement the mapping.
|
||||
|
||||
using NumToNum = HashMap<UInt64, UInt64, HashCRC32<UInt64>>;
|
||||
using NumToString = HashMap <UInt64, StringRef, HashCRC32 <UInt64 >>; /// Everywhere StringRef's with trailing zero.
|
||||
using NumToString = HashMap <UInt64, StringRef, HashCRC32<UInt64>>; /// Everywhere StringRef's with trailing zero.
|
||||
using StringToNum = HashMap<StringRef, UInt64, StringRefHash>;
|
||||
using StringToString = HashMap<StringRef, StringRef, StringRefHash>;
|
||||
|
||||
@ -740,7 +740,7 @@ private:
|
||||
|
||||
Field const_default_value; /// Null, if not specified.
|
||||
|
||||
bool initialized = false;
|
||||
std::atomic<bool> initialized {false};
|
||||
std::mutex mutex;
|
||||
|
||||
/// Can be called from different threads. It works only on the first call.
|
||||
|
@ -6,6 +6,7 @@
|
||||
#include <Common/setThreadName.h>
|
||||
|
||||
#include <DataTypes/DataTypeAggregateFunction.h>
|
||||
#include <DataTypes/DataTypeNullable.h>
|
||||
#include <Columns/ColumnsNumber.h>
|
||||
#include <Columns/ColumnArray.h>
|
||||
#include <Columns/ColumnTuple.h>
|
||||
@ -88,31 +89,55 @@ void AggregatedDataVariants::convertToTwoLevel()
|
||||
}
|
||||
|
||||
|
||||
void Aggregator::Params::calculateColumnNumbers(const Block & block)
|
||||
Block Aggregator::getHeader(bool final) const
|
||||
{
|
||||
if (keys.empty() && !key_names.empty())
|
||||
for (Names::const_iterator it = key_names.begin(); it != key_names.end(); ++it)
|
||||
keys.push_back(block.getPositionByName(*it));
|
||||
Block res;
|
||||
|
||||
for (AggregateDescriptions::iterator it = aggregates.begin(); it != aggregates.end(); ++it)
|
||||
if (it->arguments.empty() && !it->argument_names.empty())
|
||||
for (Names::const_iterator jt = it->argument_names.begin(); jt != it->argument_names.end(); ++jt)
|
||||
it->arguments.push_back(block.getPositionByName(*jt));
|
||||
if (params.src_header)
|
||||
{
|
||||
for (size_t i = 0; i < params.keys_size; ++i)
|
||||
res.insert(params.src_header.safeGetByPosition(params.keys[i]).cloneEmpty());
|
||||
|
||||
for (size_t i = 0; i < params.aggregates_size; ++i)
|
||||
{
|
||||
size_t arguments_size = params.aggregates[i].arguments.size();
|
||||
DataTypes argument_types(arguments_size);
|
||||
for (size_t j = 0; j < arguments_size; ++j)
|
||||
argument_types[j] = params.src_header.safeGetByPosition(params.aggregates[i].arguments[j]).type;
|
||||
|
||||
DataTypePtr type;
|
||||
if (final)
|
||||
type = params.aggregates[i].function->getReturnType();
|
||||
else
|
||||
type = std::make_shared<DataTypeAggregateFunction>(params.aggregates[i].function, argument_types, params.aggregates[i].parameters);
|
||||
|
||||
res.insert({ type->createColumn(), type, params.aggregates[i].column_name });
|
||||
}
|
||||
}
|
||||
else if (params.intermediate_header)
|
||||
{
|
||||
res = params.intermediate_header.cloneEmpty();
|
||||
|
||||
if (final)
|
||||
{
|
||||
for (size_t i = 0; i < params.aggregates_size; ++i)
|
||||
{
|
||||
auto & elem = res.getByPosition(params.keys_size + i);
|
||||
|
||||
elem.type = params.aggregates[i].function->getReturnType();
|
||||
elem.column = elem.type->createColumn();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
return res;
|
||||
}
|
||||
|
||||
|
||||
void Aggregator::initialize(const Block & block)
|
||||
Aggregator::Aggregator(const Params & params_)
|
||||
: params(params_),
|
||||
isCancelled([]() { return false; })
|
||||
{
|
||||
if (isCancelled())
|
||||
return;
|
||||
|
||||
std::lock_guard<std::mutex> lock(mutex);
|
||||
|
||||
if (initialized)
|
||||
return;
|
||||
|
||||
initialized = true;
|
||||
|
||||
if (current_memory_tracker)
|
||||
memory_usage_before_aggregation = current_memory_tracker->get();
|
||||
|
||||
@ -134,56 +159,7 @@ void Aggregator::initialize(const Block & block)
|
||||
all_aggregates_has_trivial_destructor = false;
|
||||
}
|
||||
|
||||
if (isCancelled())
|
||||
return;
|
||||
|
||||
/** All below, if non-empty block passed.
|
||||
* (it doesn't needed in methods that merging blocks with aggregation states).
|
||||
*/
|
||||
if (!block)
|
||||
return;
|
||||
|
||||
/// Transform names of columns to numbers.
|
||||
params.calculateColumnNumbers(block);
|
||||
|
||||
if (isCancelled())
|
||||
return;
|
||||
|
||||
/// Create "header" block, describing result.
|
||||
if (!sample)
|
||||
{
|
||||
for (size_t i = 0; i < params.keys_size; ++i)
|
||||
{
|
||||
sample.insert(block.safeGetByPosition(params.keys[i]).cloneEmpty());
|
||||
if (ColumnPtr converted = sample.safeGetByPosition(i).column->convertToFullColumnIfConst())
|
||||
sample.safeGetByPosition(i).column = converted;
|
||||
}
|
||||
|
||||
for (size_t i = 0; i < params.aggregates_size; ++i)
|
||||
{
|
||||
ColumnWithTypeAndName col;
|
||||
col.name = params.aggregates[i].column_name;
|
||||
|
||||
size_t arguments_size = params.aggregates[i].arguments.size();
|
||||
DataTypes argument_types(arguments_size);
|
||||
for (size_t j = 0; j < arguments_size; ++j)
|
||||
argument_types[j] = block.safeGetByPosition(params.aggregates[i].arguments[j]).type;
|
||||
|
||||
col.type = std::make_shared<DataTypeAggregateFunction>(params.aggregates[i].function, argument_types, params.aggregates[i].parameters);
|
||||
col.column = col.type->createColumn();
|
||||
|
||||
sample.insert(std::move(col));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
void Aggregator::setSampleBlock(const Block & block)
|
||||
{
|
||||
std::lock_guard<std::mutex> lock(mutex);
|
||||
|
||||
if (!sample)
|
||||
sample = block.cloneEmpty();
|
||||
method = chooseAggregationMethod();
|
||||
}
|
||||
|
||||
|
||||
@ -377,102 +353,70 @@ void Aggregator::compileIfPossible(AggregatedDataVariants::Type type)
|
||||
}
|
||||
|
||||
|
||||
AggregatedDataVariants::Type Aggregator::chooseAggregationMethod(const ColumnRawPtrs & key_columns, Sizes & key_sizes) const
|
||||
AggregatedDataVariants::Type Aggregator::chooseAggregationMethod()
|
||||
{
|
||||
/// If no keys. All aggregating to single row.
|
||||
if (params.keys_size == 0)
|
||||
return AggregatedDataVariants::Type::without_key;
|
||||
|
||||
/// Check if at least one of the specified keys is nullable.
|
||||
/// Create a set of nested key columns from the corresponding key columns.
|
||||
/// Here "nested" means that, if a key column is nullable, we take its nested
|
||||
/// column; otherwise we take the key column as is.
|
||||
ColumnRawPtrs nested_key_columns;
|
||||
nested_key_columns.reserve(key_columns.size());
|
||||
DataTypes types_removed_nullable;
|
||||
types_removed_nullable.reserve(params.keys.size());
|
||||
bool has_nullable_key = false;
|
||||
|
||||
for (const auto & col : key_columns)
|
||||
for (const auto & pos : params.keys)
|
||||
{
|
||||
if (col->isColumnNullable())
|
||||
const auto & type = (params.src_header ? params.src_header : params.intermediate_header).safeGetByPosition(pos).type;
|
||||
|
||||
if (type->isNullable())
|
||||
{
|
||||
const ColumnNullable & nullable_col = static_cast<const ColumnNullable &>(*col);
|
||||
nested_key_columns.push_back(&nullable_col.getNestedColumn());
|
||||
has_nullable_key = true;
|
||||
types_removed_nullable.push_back(removeNullable(type));
|
||||
}
|
||||
else
|
||||
nested_key_columns.push_back(col);
|
||||
types_removed_nullable.push_back(type);
|
||||
}
|
||||
|
||||
/** Returns ordinary (not two-level) methods, because we start from them.
|
||||
* Later, during aggregation process, data may be converted (partitioned) to two-level structure, if cardinality is high.
|
||||
*/
|
||||
|
||||
bool all_fixed = true;
|
||||
size_t keys_bytes = 0;
|
||||
|
||||
size_t num_array_keys = 0;
|
||||
bool has_arrays_of_non_fixed_elems = false;
|
||||
bool all_non_array_keys_are_fixed = true;
|
||||
bool has_tuples = false;
|
||||
bool has_arrays_of_nullable = false;
|
||||
size_t num_contiguous_keys = 0;
|
||||
size_t num_fixed_contiguous_keys = 0;
|
||||
size_t num_string_keys = 0;
|
||||
|
||||
key_sizes.resize(params.keys_size);
|
||||
for (size_t j = 0; j < params.keys_size; ++j)
|
||||
{
|
||||
if (nested_key_columns[j]->isFixedAndContiguous())
|
||||
if (types_removed_nullable[j]->isValueUnambiguouslyRepresentedInContiguousMemoryRegion())
|
||||
{
|
||||
key_sizes[j] = nested_key_columns[j]->sizeOfValueIfFixed();
|
||||
keys_bytes += key_sizes[j];
|
||||
}
|
||||
else
|
||||
{
|
||||
all_fixed = false;
|
||||
++num_contiguous_keys;
|
||||
|
||||
if (const ColumnArray * arr = typeid_cast<const ColumnArray *>(nested_key_columns[j]))
|
||||
if (types_removed_nullable[j]->isValueUnambiguouslyRepresentedInFixedSizeContiguousMemoryRegion())
|
||||
{
|
||||
++num_array_keys;
|
||||
|
||||
if (arr->getData().isColumnNullable())
|
||||
has_arrays_of_nullable = true;
|
||||
|
||||
if (!arr->getData().isFixedAndContiguous())
|
||||
has_arrays_of_non_fixed_elems = true;
|
||||
++num_fixed_contiguous_keys;
|
||||
key_sizes[j] = types_removed_nullable[j]->getSizeOfValueInMemory();
|
||||
keys_bytes += key_sizes[j];
|
||||
}
|
||||
else
|
||||
{
|
||||
all_non_array_keys_are_fixed = false;
|
||||
|
||||
if (typeid_cast<const ColumnTuple *>(nested_key_columns[j]))
|
||||
has_tuples = true;
|
||||
if (types_removed_nullable[j]->isString())
|
||||
{
|
||||
++num_string_keys;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// If no keys. All aggregating to single row.
|
||||
if (params.keys_size == 0)
|
||||
return AggregatedDataVariants::Type::without_key;
|
||||
|
||||
if (has_nullable_key || has_arrays_of_nullable)
|
||||
if (has_nullable_key)
|
||||
{
|
||||
/// At least one key is nullable. Therefore we choose an aggregation method
|
||||
/// that takes into account this fact.
|
||||
if ((params.keys_size == 1) && (nested_key_columns[0]->isNumeric()))
|
||||
{
|
||||
/// We have exactly one key and it is nullable. We shall add it a tag
|
||||
/// which specifies whether its value is null or not.
|
||||
size_t size_of_field = nested_key_columns[0]->sizeOfValueIfFixed();
|
||||
if ((size_of_field == 1) || (size_of_field == 2) || (size_of_field == 4) || (size_of_field == 8) || (size_of_field == 16))
|
||||
return AggregatedDataVariants::Type::nullable_keys128;
|
||||
else
|
||||
throw Exception{"Logical error: numeric column has sizeOfField not in 1, 2, 4, 8, 16.",
|
||||
ErrorCodes::LOGICAL_ERROR};
|
||||
}
|
||||
|
||||
if (all_fixed)
|
||||
if (params.keys_size == num_fixed_contiguous_keys)
|
||||
{
|
||||
/// Pack if possible all the keys along with information about which key values are nulls
|
||||
/// into a fixed 16- or 32-byte blob.
|
||||
if (keys_bytes > (std::numeric_limits<size_t>::max() - std::tuple_size<KeysNullMap<UInt128>>::value))
|
||||
throw Exception{"Aggregator: keys sizes overflow", ErrorCodes::LOGICAL_ERROR};
|
||||
if ((std::tuple_size<KeysNullMap<UInt128>>::value + keys_bytes) <= 16)
|
||||
if (std::tuple_size<KeysNullMap<UInt128>>::value + keys_bytes <= 16)
|
||||
return AggregatedDataVariants::Type::nullable_keys128;
|
||||
if ((std::tuple_size<KeysNullMap<UInt256>>::value + keys_bytes) <= 32)
|
||||
if (std::tuple_size<KeysNullMap<UInt256>>::value + keys_bytes <= 32)
|
||||
return AggregatedDataVariants::Type::nullable_keys256;
|
||||
}
|
||||
|
||||
@ -483,9 +427,9 @@ AggregatedDataVariants::Type Aggregator::chooseAggregationMethod(const ColumnRaw
|
||||
/// No key has been found to be nullable.
|
||||
|
||||
/// Single numeric key.
|
||||
if ((params.keys_size == 1) && nested_key_columns[0]->isNumeric())
|
||||
if (params.keys_size == 1 && types_removed_nullable[0]->isValueRepresentedByNumber())
|
||||
{
|
||||
size_t size_of_field = nested_key_columns[0]->sizeOfValueIfFixed();
|
||||
size_t size_of_field = types_removed_nullable[0]->getSizeOfValueInMemory();
|
||||
if (size_of_field == 1)
|
||||
return AggregatedDataVariants::Type::key8;
|
||||
if (size_of_field == 2)
|
||||
@ -500,23 +444,24 @@ AggregatedDataVariants::Type Aggregator::chooseAggregationMethod(const ColumnRaw
|
||||
}
|
||||
|
||||
/// If all keys fits in N bits, will use hash table with all keys packed (placed contiguously) to single N-bit key.
|
||||
if (all_fixed && keys_bytes <= 16)
|
||||
return AggregatedDataVariants::Type::keys128;
|
||||
if (all_fixed && keys_bytes <= 32)
|
||||
return AggregatedDataVariants::Type::keys256;
|
||||
if (params.keys_size == num_fixed_contiguous_keys)
|
||||
{
|
||||
if (keys_bytes <= 16)
|
||||
return AggregatedDataVariants::Type::keys128;
|
||||
if (keys_bytes <= 32)
|
||||
return AggregatedDataVariants::Type::keys256;
|
||||
}
|
||||
|
||||
/// If single string key - will use hash table with references to it. Strings itself are stored separately in Arena.
|
||||
if (params.keys_size == 1 && typeid_cast<const ColumnString *>(nested_key_columns[0]))
|
||||
if (params.keys_size == 1 && types_removed_nullable[0]->isString())
|
||||
return AggregatedDataVariants::Type::key_string;
|
||||
|
||||
if (params.keys_size == 1 && typeid_cast<const ColumnFixedString *>(nested_key_columns[0]))
|
||||
if (params.keys_size == 1 && types_removed_nullable[0]->isFixedString())
|
||||
return AggregatedDataVariants::Type::key_fixed_string;
|
||||
|
||||
/** If some keys are arrays.
|
||||
* If there is no more than one key that is array, and it is array of fixed-size elements, and all other keys are fixed-size,
|
||||
* then it is possible to use 'concat' method (due to one-to-one correspondense). Otherwise the method will be 'serialized'.
|
||||
/** If it is possible to use 'concat' method due to one-to-one correspondense. Otherwise the method will be 'serialized'.
|
||||
*/
|
||||
if (num_array_keys == 1 && !has_arrays_of_non_fixed_elems && all_non_array_keys_are_fixed)
|
||||
if (params.keys_size == num_contiguous_keys && num_fixed_contiguous_keys + 1 >= num_contiguous_keys)
|
||||
return AggregatedDataVariants::Type::concat;
|
||||
|
||||
/** For case with multiple strings, we use 'concat' method despite the fact, that correspondense is not one-to-one.
|
||||
@ -524,11 +469,8 @@ AggregatedDataVariants::Type Aggregator::chooseAggregationMethod(const ColumnRaw
|
||||
* But if strings contains zero bytes in between, different keys may clash.
|
||||
* For example, keys ('a\0b', 'c') and ('a', 'b\0c') will be aggregated as one key.
|
||||
* This is documented behaviour. It may be avoided by just switching to 'serialized' method, which is less efficient.
|
||||
*
|
||||
* Some of aggregation keys may be tuples. In most cases, tuples are flattened in expression analyzer and not passed here.
|
||||
* But in rare cases, they are not flattened. Will fallback to 'serialized' method for simplicity.
|
||||
*/
|
||||
if (num_array_keys == 0 && !has_tuples)
|
||||
if (params.keys_size == num_fixed_contiguous_keys + num_string_keys)
|
||||
return AggregatedDataVariants::Type::concat;
|
||||
|
||||
return AggregatedDataVariants::Type::serialized;
|
||||
@ -707,12 +649,9 @@ void NO_INLINE Aggregator::executeWithoutKeyImpl(
|
||||
|
||||
|
||||
bool Aggregator::executeOnBlock(Block & block, AggregatedDataVariants & result,
|
||||
ColumnRawPtrs & key_columns, AggregateColumns & aggregate_columns,
|
||||
Sizes & key_sizes, StringRefs & key,
|
||||
ColumnRawPtrs & key_columns, AggregateColumns & aggregate_columns, StringRefs & key,
|
||||
bool & no_more_keys)
|
||||
{
|
||||
initialize(block);
|
||||
|
||||
if (isCancelled())
|
||||
return true;
|
||||
|
||||
@ -769,7 +708,7 @@ bool Aggregator::executeOnBlock(Block & block, AggregatedDataVariants & result,
|
||||
/// How to perform the aggregation?
|
||||
if (result.empty())
|
||||
{
|
||||
result.init(chooseAggregationMethod(key_columns, key_sizes));
|
||||
result.init(method);
|
||||
result.keys_size = params.keys_size;
|
||||
result.key_sizes = key_sizes;
|
||||
LOG_TRACE(log, "Aggregation method: " << result.getMethodName());
|
||||
@ -1056,7 +995,6 @@ void Aggregator::execute(const BlockInputStreamPtr & stream, AggregatedDataVaria
|
||||
StringRefs key(params.keys_size);
|
||||
ColumnRawPtrs key_columns(params.keys_size);
|
||||
AggregateColumns aggregate_columns(params.aggregates_size);
|
||||
Sizes key_sizes;
|
||||
|
||||
/** Used if there is a limit on the maximum number of rows in the aggregation,
|
||||
* and if group_by_overflow_mode == ANY.
|
||||
@ -1081,14 +1019,12 @@ void Aggregator::execute(const BlockInputStreamPtr & stream, AggregatedDataVaria
|
||||
src_rows += block.rows();
|
||||
src_bytes += block.bytes();
|
||||
|
||||
if (!executeOnBlock(block, result,
|
||||
key_columns, aggregate_columns, key_sizes, key,
|
||||
no_more_keys))
|
||||
if (!executeOnBlock(block, result, key_columns, aggregate_columns, key, no_more_keys))
|
||||
break;
|
||||
}
|
||||
|
||||
double elapsed_seconds = watch.elapsedSeconds();
|
||||
size_t rows = result.size();
|
||||
size_t rows = result.sizeWithoutOverflowRow();
|
||||
LOG_TRACE(log, std::fixed << std::setprecision(3)
|
||||
<< "Aggregated. " << src_rows << " to " << rows << " rows (from " << src_bytes / 1048576.0 << " MiB)"
|
||||
<< " in " << elapsed_seconds << " sec."
|
||||
@ -1174,9 +1110,11 @@ Block Aggregator::prepareBlockAndFill(
|
||||
MutableColumns final_aggregate_columns(params.aggregates_size);
|
||||
AggregateColumnsData aggregate_columns_data(params.aggregates_size);
|
||||
|
||||
Block header = getHeader(final);
|
||||
|
||||
for (size_t i = 0; i < params.keys_size; ++i)
|
||||
{
|
||||
key_columns[i] = sample.safeGetByPosition(i).column->cloneEmpty();
|
||||
key_columns[i] = header.safeGetByPosition(i).type->createColumn();
|
||||
key_columns[i]->reserve(rows);
|
||||
}
|
||||
|
||||
@ -1184,7 +1122,7 @@ Block Aggregator::prepareBlockAndFill(
|
||||
{
|
||||
if (!final)
|
||||
{
|
||||
aggregate_columns[i] = sample.safeGetByPosition(i + params.keys_size).column->cloneEmpty();
|
||||
aggregate_columns[i] = header.safeGetByPosition(i + params.keys_size).type->createColumn();
|
||||
|
||||
/// The ColumnAggregateFunction column captures the shared ownership of the arena with the aggregate function states.
|
||||
ColumnAggregateFunction & column_aggregate_func = static_cast<ColumnAggregateFunction &>(*aggregate_columns[i]);
|
||||
@ -1213,7 +1151,7 @@ Block Aggregator::prepareBlockAndFill(
|
||||
|
||||
filler(key_columns, aggregate_columns_data, final_aggregate_columns, data_variants.key_sizes, final);
|
||||
|
||||
Block res = sample.cloneEmpty();
|
||||
Block res = header.cloneEmpty();
|
||||
|
||||
for (size_t i = 0; i < params.keys_size; ++i)
|
||||
res.getByPosition(i).column = std::move(key_columns[i]);
|
||||
@ -1221,18 +1159,13 @@ Block Aggregator::prepareBlockAndFill(
|
||||
for (size_t i = 0; i < params.aggregates_size; ++i)
|
||||
{
|
||||
if (final)
|
||||
{
|
||||
res.getByPosition(i + params.keys_size).type = aggregate_functions[i]->getReturnType();
|
||||
res.getByPosition(i + params.keys_size).column = std::move(final_aggregate_columns[i]);
|
||||
}
|
||||
else
|
||||
{
|
||||
res.getByPosition(i + params.keys_size).column = std::move(aggregate_columns[i]);
|
||||
}
|
||||
}
|
||||
|
||||
/// Change the size of the columns-constants in the block.
|
||||
size_t columns = sample.columns();
|
||||
size_t columns = header.columns();
|
||||
for (size_t i = 0; i < columns; ++i)
|
||||
if (res.getByPosition(i).column->isColumnConst())
|
||||
res.getByPosition(i).column = res.getByPosition(i).column->cut(0, rows);
|
||||
@ -1653,12 +1586,7 @@ public:
|
||||
|
||||
String getName() const override { return "MergingAndConverting"; }
|
||||
|
||||
String getID() const override
|
||||
{
|
||||
std::stringstream res;
|
||||
res << this;
|
||||
return res.str();
|
||||
}
|
||||
Block getHeader() const override { return aggregator.getHeader(final); }
|
||||
|
||||
~MergingAndConvertingBlockInputStream()
|
||||
{
|
||||
@ -1846,7 +1774,7 @@ std::unique_ptr<IBlockInputStream> Aggregator::mergeAndConvertToBlocks(
|
||||
non_empty_data.push_back(data);
|
||||
|
||||
if (non_empty_data.empty())
|
||||
return std::make_unique<NullBlockInputStream>();
|
||||
return std::make_unique<NullBlockInputStream>(getHeader(final));
|
||||
|
||||
if (non_empty_data.size() > 1)
|
||||
{
|
||||
@ -2023,14 +1951,6 @@ void NO_INLINE Aggregator::mergeWithoutKeyStreamsImpl(
|
||||
|
||||
void Aggregator::mergeStream(const BlockInputStreamPtr & stream, AggregatedDataVariants & result, size_t max_threads)
|
||||
{
|
||||
if (isCancelled())
|
||||
return;
|
||||
|
||||
StringRefs key(params.keys_size);
|
||||
ColumnRawPtrs key_columns(params.keys_size);
|
||||
|
||||
initialize({});
|
||||
|
||||
if (isCancelled())
|
||||
return;
|
||||
|
||||
@ -2062,15 +1982,6 @@ void Aggregator::mergeStream(const BlockInputStreamPtr & stream, AggregatedDataV
|
||||
if (bucket_to_blocks.empty())
|
||||
return;
|
||||
|
||||
setSampleBlock(bucket_to_blocks.begin()->second.front());
|
||||
|
||||
/// How to perform the aggregation?
|
||||
for (size_t i = 0; i < params.keys_size; ++i)
|
||||
key_columns[i] = sample.safeGetByPosition(i).column.get();
|
||||
|
||||
Sizes key_sizes;
|
||||
AggregatedDataVariants::Type method = chooseAggregationMethod(key_columns, key_sizes);
|
||||
|
||||
/** `minus one` means the absence of information about the bucket
|
||||
* - in the case of single-level aggregation, as well as for blocks with "overflowing" values.
|
||||
* If there is at least one block with a bucket number greater than zero, then there was a two-level aggregation.
|
||||
@ -2111,7 +2022,7 @@ void Aggregator::mergeStream(const BlockInputStreamPtr & stream, AggregatedDataV
|
||||
|
||||
LOG_TRACE(log, "Merging partially aggregated two-level data.");
|
||||
|
||||
auto merge_bucket = [&bucket_to_blocks, &result, &key_sizes, this](Int32 bucket, Arena * aggregates_pool, MemoryTracker * memory_tracker)
|
||||
auto merge_bucket = [&bucket_to_blocks, &result, this](Int32 bucket, Arena * aggregates_pool, MemoryTracker * memory_tracker)
|
||||
{
|
||||
current_memory_tracker = memory_tracker;
|
||||
|
||||
@ -2122,7 +2033,7 @@ void Aggregator::mergeStream(const BlockInputStreamPtr & stream, AggregatedDataV
|
||||
|
||||
#define M(NAME) \
|
||||
else if (result.type == AggregatedDataVariants::Type::NAME) \
|
||||
mergeStreamsImpl(block, key_sizes, aggregates_pool, *result.NAME, result.NAME->data.impls[bucket], nullptr, false);
|
||||
mergeStreamsImpl(block, result.key_sizes, aggregates_pool, *result.NAME, result.NAME->data.impls[bucket], nullptr, false);
|
||||
|
||||
if (false) {}
|
||||
APPLY_FOR_VARIANTS_TWO_LEVEL(M)
|
||||
@ -2190,7 +2101,7 @@ void Aggregator::mergeStream(const BlockInputStreamPtr & stream, AggregatedDataV
|
||||
|
||||
#define M(NAME, IS_TWO_LEVEL) \
|
||||
else if (result.type == AggregatedDataVariants::Type::NAME) \
|
||||
mergeStreamsImpl(block, key_sizes, result.aggregates_pool, *result.NAME, result.NAME->data, result.without_key, no_more_keys);
|
||||
mergeStreamsImpl(block, result.key_sizes, result.aggregates_pool, *result.NAME, result.NAME->data, result.without_key, no_more_keys);
|
||||
|
||||
APPLY_FOR_AGGREGATED_VARIANTS(M)
|
||||
#undef M
|
||||
@ -2214,32 +2125,19 @@ Block Aggregator::mergeBlocks(BlocksList & blocks, bool final)
|
||||
LOG_TRACE(log, "Merging partially aggregated blocks (bucket = " << bucket_num << ").");
|
||||
Stopwatch watch;
|
||||
|
||||
StringRefs key(params.keys_size);
|
||||
ColumnRawPtrs key_columns(params.keys_size);
|
||||
|
||||
initialize({});
|
||||
setSampleBlock(blocks.front());
|
||||
|
||||
/// How to perform the aggregation?
|
||||
for (size_t i = 0; i < params.keys_size; ++i)
|
||||
key_columns[i] = sample.safeGetByPosition(i).column.get();
|
||||
|
||||
Sizes key_sizes;
|
||||
AggregatedDataVariants::Type method = chooseAggregationMethod(key_columns, key_sizes);
|
||||
|
||||
/** If possible, change 'method' to some_hash64. Otherwise, leave as is.
|
||||
* Better hash function is needed because during external aggregation,
|
||||
* we may merge partitions of data with total number of keys far greater than 4 billion.
|
||||
*/
|
||||
|
||||
#define APPLY_FOR_VARIANTS_THAT_MAY_USE_BETTER_HASH_FUNCTION(M) \
|
||||
M(key64) \
|
||||
M(key_string) \
|
||||
M(key64) \
|
||||
M(key_string) \
|
||||
M(key_fixed_string) \
|
||||
M(keys128) \
|
||||
M(keys256) \
|
||||
M(concat) \
|
||||
M(serialized) \
|
||||
M(keys128) \
|
||||
M(keys256) \
|
||||
M(concat) \
|
||||
M(serialized) \
|
||||
|
||||
#define M(NAME) \
|
||||
if (method == AggregatedDataVariants::Type::NAME) \
|
||||
@ -2377,20 +2275,16 @@ std::vector<Block> Aggregator::convertBlockToTwoLevel(const Block & block)
|
||||
if (!block)
|
||||
return {};
|
||||
|
||||
initialize({});
|
||||
setSampleBlock(block);
|
||||
|
||||
AggregatedDataVariants data;
|
||||
|
||||
StringRefs key(params.keys_size);
|
||||
ColumnRawPtrs key_columns(params.keys_size);
|
||||
Sizes key_sizes;
|
||||
|
||||
/// Remember the columns we will work with
|
||||
for (size_t i = 0; i < params.keys_size; ++i)
|
||||
key_columns[i] = block.safeGetByPosition(i).column.get();
|
||||
|
||||
AggregatedDataVariants::Type type = chooseAggregationMethod(key_columns, key_sizes);
|
||||
AggregatedDataVariants::Type type = method;
|
||||
data.keys_size = params.keys_size;
|
||||
data.key_sizes = key_sizes;
|
||||
|
||||
@ -2496,30 +2390,6 @@ void Aggregator::destroyAllAggregateStates(AggregatedDataVariants & result)
|
||||
}
|
||||
|
||||
|
||||
String Aggregator::getID() const
|
||||
{
|
||||
std::stringstream res;
|
||||
|
||||
if (params.keys.empty())
|
||||
{
|
||||
res << "key_names";
|
||||
for (size_t i = 0; i < params.key_names.size(); ++i)
|
||||
res << ", " << params.key_names[i];
|
||||
}
|
||||
else
|
||||
{
|
||||
res << "keys";
|
||||
for (size_t i = 0; i < params.keys.size(); ++i)
|
||||
res << ", " << params.keys[i];
|
||||
}
|
||||
|
||||
res << ", aggregates";
|
||||
for (size_t i = 0; i < params.aggregates_size; ++i)
|
||||
res << ", " << params.aggregates[i].column_name;
|
||||
|
||||
return res.str();
|
||||
}
|
||||
|
||||
void Aggregator::setCancellationHook(const CancellationHook cancellation_hook)
|
||||
{
|
||||
isCancelled = cancellation_hook;
|
||||
|
@ -812,8 +812,8 @@ struct AggregatedDataVariants : private boost::noncopyable
|
||||
{
|
||||
switch (type_)
|
||||
{
|
||||
case Type::EMPTY: break;
|
||||
case Type::without_key: break;
|
||||
case Type::EMPTY: break;
|
||||
case Type::without_key: break;
|
||||
|
||||
#define M(NAME, IS_TWO_LEVEL) \
|
||||
case Type::NAME: NAME = std::make_unique<decltype(NAME)::element_type>(); break;
|
||||
@ -832,8 +832,8 @@ struct AggregatedDataVariants : private boost::noncopyable
|
||||
{
|
||||
switch (type)
|
||||
{
|
||||
case Type::EMPTY: return 0;
|
||||
case Type::without_key: return 1;
|
||||
case Type::EMPTY: return 0;
|
||||
case Type::without_key: return 1;
|
||||
|
||||
#define M(NAME, IS_TWO_LEVEL) \
|
||||
case Type::NAME: return NAME->data.size() + (without_key != nullptr);
|
||||
@ -850,8 +850,8 @@ struct AggregatedDataVariants : private boost::noncopyable
|
||||
{
|
||||
switch (type)
|
||||
{
|
||||
case Type::EMPTY: return 0;
|
||||
case Type::without_key: return 1;
|
||||
case Type::EMPTY: return 0;
|
||||
case Type::without_key: return 1;
|
||||
|
||||
#define M(NAME, IS_TWO_LEVEL) \
|
||||
case Type::NAME: return NAME->data.size();
|
||||
@ -867,8 +867,8 @@ struct AggregatedDataVariants : private boost::noncopyable
|
||||
{
|
||||
switch (type)
|
||||
{
|
||||
case Type::EMPTY: return "EMPTY";
|
||||
case Type::without_key: return "without_key";
|
||||
case Type::EMPTY: return "EMPTY";
|
||||
case Type::without_key: return "without_key";
|
||||
|
||||
#define M(NAME, IS_TWO_LEVEL) \
|
||||
case Type::NAME: return #NAME;
|
||||
@ -884,8 +884,8 @@ struct AggregatedDataVariants : private boost::noncopyable
|
||||
{
|
||||
switch (type)
|
||||
{
|
||||
case Type::EMPTY: return false;
|
||||
case Type::without_key: return false;
|
||||
case Type::EMPTY: return false;
|
||||
case Type::without_key: return false;
|
||||
|
||||
#define M(NAME, IS_TWO_LEVEL) \
|
||||
case Type::NAME: return IS_TWO_LEVEL;
|
||||
@ -900,25 +900,25 @@ struct AggregatedDataVariants : private boost::noncopyable
|
||||
#define APPLY_FOR_VARIANTS_CONVERTIBLE_TO_TWO_LEVEL(M) \
|
||||
M(key32) \
|
||||
M(key64) \
|
||||
M(key_string) \
|
||||
M(key_fixed_string) \
|
||||
M(keys128) \
|
||||
M(keys256) \
|
||||
M(hashed) \
|
||||
M(concat) \
|
||||
M(serialized) \
|
||||
M(nullable_keys128) \
|
||||
M(nullable_keys256) \
|
||||
M(key_string) \
|
||||
M(key_fixed_string) \
|
||||
M(keys128) \
|
||||
M(keys256) \
|
||||
M(hashed) \
|
||||
M(concat) \
|
||||
M(serialized) \
|
||||
M(nullable_keys128) \
|
||||
M(nullable_keys256) \
|
||||
|
||||
#define APPLY_FOR_VARIANTS_NOT_CONVERTIBLE_TO_TWO_LEVEL(M) \
|
||||
M(key8) \
|
||||
M(key8) \
|
||||
M(key16) \
|
||||
M(key64_hash64) \
|
||||
M(key_string_hash64) \
|
||||
M(key_string_hash64)\
|
||||
M(key_fixed_string_hash64) \
|
||||
M(keys128_hash64) \
|
||||
M(keys256_hash64) \
|
||||
M(concat_hash64) \
|
||||
M(keys128_hash64) \
|
||||
M(keys256_hash64) \
|
||||
M(concat_hash64) \
|
||||
M(serialized_hash64) \
|
||||
|
||||
#define APPLY_FOR_VARIANTS_SINGLE_LEVEL(M) \
|
||||
@ -943,16 +943,16 @@ struct AggregatedDataVariants : private boost::noncopyable
|
||||
void convertToTwoLevel();
|
||||
|
||||
#define APPLY_FOR_VARIANTS_TWO_LEVEL(M) \
|
||||
M(key32_two_level) \
|
||||
M(key64_two_level) \
|
||||
M(key_string_two_level) \
|
||||
M(key_fixed_string_two_level) \
|
||||
M(keys128_two_level) \
|
||||
M(keys256_two_level) \
|
||||
M(hashed_two_level) \
|
||||
M(concat_two_level) \
|
||||
M(serialized_two_level) \
|
||||
M(nullable_keys128_two_level) \
|
||||
M(key32_two_level) \
|
||||
M(key64_two_level) \
|
||||
M(key_string_two_level) \
|
||||
M(key_fixed_string_two_level) \
|
||||
M(keys128_two_level) \
|
||||
M(keys256_two_level) \
|
||||
M(hashed_two_level) \
|
||||
M(concat_two_level) \
|
||||
M(serialized_two_level) \
|
||||
M(nullable_keys128_two_level) \
|
||||
M(nullable_keys256_two_level)
|
||||
};
|
||||
|
||||
@ -979,9 +979,13 @@ class Aggregator
|
||||
public:
|
||||
struct Params
|
||||
{
|
||||
/// Data structure of source blocks.
|
||||
Block src_header;
|
||||
/// Data structure of intermediate blocks before merge.
|
||||
Block intermediate_header;
|
||||
|
||||
/// What to count.
|
||||
Names key_names;
|
||||
ColumnNumbers keys; /// The column numbers are computed later.
|
||||
ColumnNumbers keys;
|
||||
AggregateDescriptions aggregates;
|
||||
size_t keys_size;
|
||||
size_t aggregates_size;
|
||||
@ -1008,35 +1012,34 @@ public:
|
||||
const std::string tmp_path;
|
||||
|
||||
Params(
|
||||
const Names & key_names_, const AggregateDescriptions & aggregates_,
|
||||
const Block & src_header_,
|
||||
const ColumnNumbers & keys_, const AggregateDescriptions & aggregates_,
|
||||
bool overflow_row_, size_t max_rows_to_group_by_, OverflowMode group_by_overflow_mode_,
|
||||
Compiler * compiler_, UInt32 min_count_to_compile_,
|
||||
size_t group_by_two_level_threshold_, size_t group_by_two_level_threshold_bytes_,
|
||||
size_t max_bytes_before_external_group_by_, const std::string & tmp_path_)
|
||||
: key_names(key_names_), aggregates(aggregates_), aggregates_size(aggregates.size()),
|
||||
: src_header(src_header_),
|
||||
keys(keys_), aggregates(aggregates_), keys_size(keys.size()), aggregates_size(aggregates.size()),
|
||||
overflow_row(overflow_row_), max_rows_to_group_by(max_rows_to_group_by_), group_by_overflow_mode(group_by_overflow_mode_),
|
||||
compiler(compiler_), min_count_to_compile(min_count_to_compile_),
|
||||
group_by_two_level_threshold(group_by_two_level_threshold_), group_by_two_level_threshold_bytes(group_by_two_level_threshold_bytes_),
|
||||
max_bytes_before_external_group_by(max_bytes_before_external_group_by_), tmp_path(tmp_path_)
|
||||
{
|
||||
std::sort(key_names.begin(), key_names.end());
|
||||
key_names.erase(std::unique(key_names.begin(), key_names.end()), key_names.end());
|
||||
keys_size = key_names.size();
|
||||
}
|
||||
|
||||
/// Only parameters that matter during merge.
|
||||
Params(const Names & key_names_, const AggregateDescriptions & aggregates_, bool overflow_row_)
|
||||
: Params(key_names_, aggregates_, overflow_row_, 0, OverflowMode::THROW, nullptr, 0, 0, 0, 0, "") {}
|
||||
Params(const Block & intermediate_header_,
|
||||
const ColumnNumbers & keys_, const AggregateDescriptions & aggregates_, bool overflow_row_)
|
||||
: Params(Block(), keys_, aggregates_, overflow_row_, 0, OverflowMode::THROW, nullptr, 0, 0, 0, 0, "")
|
||||
{
|
||||
intermediate_header = intermediate_header_;
|
||||
}
|
||||
|
||||
/// Compute the column numbers in `keys` and `aggregates`.
|
||||
/// Calculate the column numbers in `keys` and `aggregates`.
|
||||
void calculateColumnNumbers(const Block & block);
|
||||
};
|
||||
|
||||
Aggregator(const Params & params_)
|
||||
: params(params_),
|
||||
isCancelled([]() { return false; })
|
||||
{
|
||||
}
|
||||
Aggregator(const Params & params_);
|
||||
|
||||
/// Aggregate the source. Get the result in the form of one of the data structures.
|
||||
void execute(const BlockInputStreamPtr & stream, AggregatedDataVariants & result);
|
||||
@ -1049,7 +1052,7 @@ public:
|
||||
/// Process one block. Return false if the processing should be aborted (with group_by_overflow_mode = 'break').
|
||||
bool executeOnBlock(Block & block, AggregatedDataVariants & result,
|
||||
ColumnRawPtrs & key_columns, AggregateColumns & aggregate_columns, /// Passed to not create them anew for each block
|
||||
Sizes & key_sizes, StringRefs & keys, /// - pass the corresponding objects that are initially empty.
|
||||
StringRefs & keys, /// - pass the corresponding objects that are initially empty.
|
||||
bool & no_more_keys);
|
||||
|
||||
/** Convert the aggregation data structure into a block.
|
||||
@ -1087,9 +1090,6 @@ public:
|
||||
*/
|
||||
void setCancellationHook(const CancellationHook cancellation_hook);
|
||||
|
||||
/// For IBlockInputStream.
|
||||
String getID() const;
|
||||
|
||||
/// For external aggregation.
|
||||
void writeToTemporaryFile(AggregatedDataVariants & data_variants);
|
||||
|
||||
@ -1111,12 +1111,18 @@ public:
|
||||
|
||||
const TemporaryFiles & getTemporaryFiles() const { return temporary_files; }
|
||||
|
||||
/// Get data structure of the result.
|
||||
Block getHeader(bool final) const;
|
||||
|
||||
protected:
|
||||
friend struct AggregatedDataVariants;
|
||||
friend class MergingAndConvertingBlockInputStream;
|
||||
|
||||
Params params;
|
||||
|
||||
AggregatedDataVariants::Type method;
|
||||
Sizes key_sizes;
|
||||
|
||||
AggregateFunctionsPlainPtrs aggregate_functions;
|
||||
|
||||
/** This array serves two purposes.
|
||||
@ -1145,12 +1151,8 @@ protected:
|
||||
/// How many RAM were used to process the query before processing the first block.
|
||||
Int64 memory_usage_before_aggregation = 0;
|
||||
|
||||
/// To initialize from the first block when used concurrently.
|
||||
bool initialized = false;
|
||||
std::mutex mutex;
|
||||
|
||||
Block sample;
|
||||
|
||||
Logger * log = &Logger::get("Aggregator");
|
||||
|
||||
/** Dynamically compiled library for aggregation, if any.
|
||||
@ -1179,18 +1181,8 @@ protected:
|
||||
/// For external aggregation.
|
||||
TemporaryFiles temporary_files;
|
||||
|
||||
/** If only the column names (key_names, and also aggregates[i].column_name) are specified, then calculate the column numbers.
|
||||
* Generate block - sample of the result. It is used in the convertToBlocks, mergeAndConvertToBlocks methods.
|
||||
*/
|
||||
void initialize(const Block & block);
|
||||
|
||||
/** Set the block - sample of the result,
|
||||
* only if it has not already been set.
|
||||
*/
|
||||
void setSampleBlock(const Block & block);
|
||||
|
||||
/** Select the aggregation method based on the number and types of keys. */
|
||||
AggregatedDataVariants::Type chooseAggregationMethod(const ColumnRawPtrs & key_columns, Sizes & key_sizes) const;
|
||||
AggregatedDataVariants::Type chooseAggregationMethod();
|
||||
|
||||
/** Create states of aggregate functions for one key.
|
||||
*/
|
||||
|
@ -4,6 +4,7 @@
|
||||
#include <DataStreams/BlockExtraInfoInputStream.h>
|
||||
#include <DataStreams/RemoteBlockInputStream.h>
|
||||
|
||||
|
||||
namespace DB
|
||||
{
|
||||
|
||||
@ -46,7 +47,7 @@ void DescribeStreamFactory::createForShard(
|
||||
}
|
||||
|
||||
auto remote_stream = std::make_shared<RemoteBlockInputStream>(
|
||||
shard_info.pool, query, context, nullptr, throttler);
|
||||
shard_info.pool, query, InterpreterDescribeQuery::getSampleBlock(), context, nullptr, throttler);
|
||||
remote_stream->setPoolMode(PoolMode::GET_ALL);
|
||||
remote_stream->appendExtraInfo();
|
||||
res.emplace_back(std::move(remote_stream));
|
||||
|
@ -28,12 +28,14 @@ namespace ClusterProxy
|
||||
{
|
||||
|
||||
SelectStreamFactory::SelectStreamFactory(
|
||||
QueryProcessingStage::Enum processed_stage_,
|
||||
QualifiedTableName main_table_,
|
||||
const Tables & external_tables_)
|
||||
: processed_stage{processed_stage_}
|
||||
, main_table(std::move(main_table_))
|
||||
, external_tables{external_tables_}
|
||||
const Block & header,
|
||||
QueryProcessingStage::Enum processed_stage_,
|
||||
QualifiedTableName main_table_,
|
||||
const Tables & external_tables_)
|
||||
: header(header),
|
||||
processed_stage{processed_stage_},
|
||||
main_table(std::move(main_table_)),
|
||||
external_tables{external_tables_}
|
||||
{
|
||||
}
|
||||
|
||||
@ -55,10 +57,10 @@ BlockInputStreamPtr createLocalStream(const ASTPtr & query_ast, const Context &
|
||||
}
|
||||
|
||||
void SelectStreamFactory::createForShard(
|
||||
const Cluster::ShardInfo & shard_info,
|
||||
const String & query, const ASTPtr & query_ast,
|
||||
const Context & context, const ThrottlerPtr & throttler,
|
||||
BlockInputStreams & res)
|
||||
const Cluster::ShardInfo & shard_info,
|
||||
const String & query, const ASTPtr & query_ast,
|
||||
const Context & context, const ThrottlerPtr & throttler,
|
||||
BlockInputStreams & res)
|
||||
{
|
||||
auto emplace_local_stream = [&]()
|
||||
{
|
||||
@ -67,7 +69,7 @@ void SelectStreamFactory::createForShard(
|
||||
|
||||
auto emplace_remote_stream = [&]()
|
||||
{
|
||||
auto stream = std::make_shared<RemoteBlockInputStream>(shard_info.pool, query, context, nullptr, throttler, external_tables, processed_stage);
|
||||
auto stream = std::make_shared<RemoteBlockInputStream>(shard_info.pool, query, header, context, nullptr, throttler, external_tables, processed_stage);
|
||||
stream->setPoolMode(PoolMode::GET_MANY);
|
||||
stream->setMainTable(main_table);
|
||||
res.emplace_back(std::move(stream));
|
||||
@ -157,7 +159,7 @@ void SelectStreamFactory::createForShard(
|
||||
auto lazily_create_stream = [
|
||||
pool = shard_info.pool, shard_num = shard_info.shard_num, query, query_ast, context, throttler,
|
||||
main_table = main_table, external_tables = external_tables, stage = processed_stage,
|
||||
local_delay]()
|
||||
local_delay, this]()
|
||||
-> BlockInputStreamPtr
|
||||
{
|
||||
std::vector<ConnectionPoolWithFailover::TryResult> try_results;
|
||||
@ -192,11 +194,11 @@ void SelectStreamFactory::createForShard(
|
||||
connections.emplace_back(std::move(try_result.entry));
|
||||
|
||||
return std::make_shared<RemoteBlockInputStream>(
|
||||
std::move(connections), query, context, nullptr, throttler, external_tables, stage);
|
||||
std::move(connections), query, header, context, nullptr, throttler, external_tables, stage);
|
||||
}
|
||||
};
|
||||
|
||||
res.emplace_back(std::make_shared<LazyBlockInputStream>("LazyShardWithLocalReplica", lazily_create_stream));
|
||||
res.emplace_back(std::make_shared<LazyBlockInputStream>("LazyShardWithLocalReplica", header, lazily_create_stream));
|
||||
}
|
||||
else
|
||||
emplace_remote_stream();
|
||||
|
@ -14,6 +14,7 @@ class SelectStreamFactory final : public IStreamFactory
|
||||
{
|
||||
public:
|
||||
SelectStreamFactory(
|
||||
const Block & header,
|
||||
QueryProcessingStage::Enum processed_stage,
|
||||
QualifiedTableName main_table,
|
||||
const Tables & external_tables);
|
||||
@ -25,6 +26,7 @@ public:
|
||||
BlockInputStreams & res) override;
|
||||
|
||||
private:
|
||||
const Block header;
|
||||
QueryProcessingStage::Enum processed_stage;
|
||||
QualifiedTableName main_table;
|
||||
const Tables & external_tables;
|
||||
|
@ -940,7 +940,7 @@ class DDLQueryStatusInputSream : public IProfilingBlockInputStream
|
||||
public:
|
||||
|
||||
DDLQueryStatusInputSream(const String & zk_node_path, const DDLLogEntry & entry, const Context & context)
|
||||
: node_path(zk_node_path), context(context), watch(CLOCK_MONOTONIC_COARSE), log(&Logger::get("DDLQueryStatusInputSream"))
|
||||
: node_path(zk_node_path), context(context), watch(CLOCK_MONOTONIC_COARSE), log(&Logger::get("DDLQueryStatusInputSream"))
|
||||
{
|
||||
sample = Block{
|
||||
{std::make_shared<DataTypeString>(), "host"},
|
||||
@ -964,10 +964,7 @@ public:
|
||||
return "DDLQueryStatusInputSream";
|
||||
}
|
||||
|
||||
String getID() const override
|
||||
{
|
||||
return "DDLQueryStatusInputSream(" + node_path + ")";
|
||||
}
|
||||
Block getHeader() const override { return sample; };
|
||||
|
||||
Block readImpl() override
|
||||
{
|
||||
@ -1138,7 +1135,6 @@ BlockIO executeDDLQueryOnCluster(const ASTPtr & query_ptr_, const Context & cont
|
||||
return io;
|
||||
|
||||
auto stream = std::make_shared<DDLQueryStatusInputSream>(node_path, entry, context);
|
||||
io.in_sample = stream->getSampleBlock();
|
||||
io.in = std::move(stream);
|
||||
return io;
|
||||
}
|
||||
|
@ -910,45 +910,6 @@ void ExpressionActions::finalize(const Names & output_columns)
|
||||
}
|
||||
|
||||
|
||||
std::string ExpressionActions::getID() const
|
||||
{
|
||||
std::stringstream ss;
|
||||
|
||||
for (size_t i = 0; i < actions.size(); ++i)
|
||||
{
|
||||
if (i)
|
||||
ss << ", ";
|
||||
if (actions[i].type == ExpressionAction::APPLY_FUNCTION)
|
||||
ss << actions[i].result_name;
|
||||
if (actions[i].type == ExpressionAction::ARRAY_JOIN)
|
||||
{
|
||||
ss << (actions[i].array_join_is_left ? "LEFT ARRAY JOIN" : "ARRAY JOIN") << "{";
|
||||
for (NameSet::const_iterator it = actions[i].array_joined_columns.begin();
|
||||
it != actions[i].array_joined_columns.end(); ++it)
|
||||
{
|
||||
if (it != actions[i].array_joined_columns.begin())
|
||||
ss << ", ";
|
||||
ss << *it;
|
||||
}
|
||||
ss << "}";
|
||||
}
|
||||
|
||||
/// TODO JOIN
|
||||
}
|
||||
|
||||
ss << ": {";
|
||||
NamesAndTypesList output_columns = sample_block.getNamesAndTypesList();
|
||||
for (NamesAndTypesList::const_iterator it = output_columns.begin(); it != output_columns.end(); ++it)
|
||||
{
|
||||
if (it != output_columns.begin())
|
||||
ss << ", ";
|
||||
ss << it->name;
|
||||
}
|
||||
ss << "}";
|
||||
|
||||
return ss.str();
|
||||
}
|
||||
|
||||
std::string ExpressionActions::dumpActions() const
|
||||
{
|
||||
std::stringstream ss;
|
||||
@ -1064,7 +1025,7 @@ BlockInputStreamPtr ExpressionActions::createStreamWithNonJoinedDataIfFullOrRigh
|
||||
{
|
||||
Block left_sample_block;
|
||||
for (const auto & input_elem : input_columns)
|
||||
left_sample_block.insert({ nullptr, input_elem.type, input_elem.name });
|
||||
left_sample_block.insert(ColumnWithTypeAndName{ input_elem.type, input_elem.name });
|
||||
|
||||
return action.join->createStreamWithNonJoinedRows(left_sample_block, max_block_size);
|
||||
}
|
||||
|
@ -194,8 +194,6 @@ public:
|
||||
/// Obtain a sample block that contains the names and types of result columns.
|
||||
const Block & getSampleBlock() const { return sample_block; }
|
||||
|
||||
std::string getID() const;
|
||||
|
||||
std::string dumpActions() const;
|
||||
|
||||
static std::string getSmallestColumn(const NamesAndTypesList & columns);
|
||||
|
@ -883,7 +883,6 @@ void ExpressionAnalyzer::addExternalStorage(ASTPtr & subquery_or_table_name_or_t
|
||||
|
||||
external_tables[external_table_name] = external_storage;
|
||||
subqueries_for_sets[external_table_name].source = interpreter->execute().in;
|
||||
subqueries_for_sets[external_table_name].source_sample = interpreter->getSampleBlock();
|
||||
subqueries_for_sets[external_table_name].table = external_storage;
|
||||
|
||||
/** NOTE If it was written IN tmp_table - the existing temporary (but not external) table,
|
||||
@ -1661,8 +1660,7 @@ void ExpressionAnalyzer::makeSet(const ASTFunction * node, const Block & sample_
|
||||
{
|
||||
auto interpreter = interpretSubquery(arg, context, subquery_depth, {});
|
||||
subquery_for_set.source = std::make_shared<LazyBlockInputStream>(
|
||||
[interpreter]() mutable { return interpreter->execute().in; });
|
||||
subquery_for_set.source_sample = interpreter->getSampleBlock();
|
||||
interpreter->getSampleBlock(), [interpreter]() mutable { return interpreter->execute().in; });
|
||||
|
||||
/** Why is LazyBlockInputStream used?
|
||||
*
|
||||
@ -2486,13 +2484,14 @@ bool ExpressionAnalyzer::appendJoin(ExpressionActionsChain & chain, bool only_ty
|
||||
table = table_to_join.subquery;
|
||||
|
||||
auto interpreter = interpretSubquery(table, context, subquery_depth, required_joined_columns);
|
||||
subquery_for_set.source = std::make_shared<LazyBlockInputStream>([interpreter]() mutable { return interpreter->execute().in; });
|
||||
subquery_for_set.source_sample = interpreter->getSampleBlock();
|
||||
subquery_for_set.source = std::make_shared<LazyBlockInputStream>(
|
||||
interpreter->getSampleBlock(),
|
||||
[interpreter]() mutable { return interpreter->execute().in; });
|
||||
}
|
||||
|
||||
/// TODO You do not need to set this up when JOIN is only needed on remote servers.
|
||||
subquery_for_set.join = join;
|
||||
subquery_for_set.join->setSampleBlock(subquery_for_set.source_sample);
|
||||
subquery_for_set.join->setSampleBlock(subquery_for_set.source->getHeader());
|
||||
}
|
||||
|
||||
addJoinAction(step.actions, false);
|
||||
|
@ -41,7 +41,6 @@ struct SubqueryForSet
|
||||
{
|
||||
/// The source is obtained using the InterpreterSelectQuery subquery.
|
||||
BlockInputStreamPtr source;
|
||||
Block source_sample;
|
||||
|
||||
/// If set, build it from result.
|
||||
SetPtr set;
|
||||
|
@ -244,7 +244,6 @@ BlockIO InterpreterCheckQuery::execute()
|
||||
|
||||
BlockIO res;
|
||||
res.in = std::make_shared<OneBlockInputStream>(block);
|
||||
res.in_sample = getSampleBlock();
|
||||
|
||||
return res;
|
||||
}
|
||||
@ -256,7 +255,6 @@ BlockIO InterpreterCheckQuery::execute()
|
||||
|
||||
BlockIO res;
|
||||
res.in = std::make_shared<OneBlockInputStream>(result);
|
||||
res.in_sample = result.cloneEmpty();
|
||||
|
||||
return res;
|
||||
}
|
||||
|
@ -573,7 +573,6 @@ BlockIO InterpreterCreateQuery::createTable(ASTCreateQuery & create)
|
||||
out = std::make_shared<ProhibitColumnsBlockOutputStream>(out, columns.materialized_columns);
|
||||
|
||||
BlockIO io;
|
||||
io.in_sample = as_select_sample;
|
||||
io.in = std::make_shared<NullAndDoCopyBlockInputStream>(interpreter_select->execute().in, out);
|
||||
|
||||
return io;
|
||||
|
@ -24,8 +24,6 @@ BlockIO InterpreterDescribeQuery::execute()
|
||||
{
|
||||
BlockIO res;
|
||||
res.in = executeImpl();
|
||||
res.in_sample = getSampleBlock();
|
||||
|
||||
return res;
|
||||
}
|
||||
|
||||
|
@ -21,11 +21,12 @@ public:
|
||||
|
||||
BlockIO execute() override;
|
||||
|
||||
static Block getSampleBlock();
|
||||
|
||||
private:
|
||||
ASTPtr query_ptr;
|
||||
const Context & context;
|
||||
|
||||
Block getSampleBlock();
|
||||
BlockInputStreamPtr executeImpl();
|
||||
};
|
||||
|
||||
|
Some files were not shown because too many files have changed in this diff Show More
Loading…
Reference in New Issue
Block a user