Merge remote-tracking branch 'upstream/master'

This commit is contained in:
BayoNet 2018-02-21 13:09:07 +03:00
commit 8d7ccb833e
214 changed files with 1520 additions and 1490 deletions

2
contrib/zookeeper vendored

@ -1 +1 @@
Subproject commit 5aa9e889fe9e739af3c2a00222d9a3a0a57179dd
Subproject commit 438afae5af36c5be9c82d074f43a9bb19e0797c0

View File

@ -1,6 +1,6 @@
# This strings autochanged from release_lib.sh:
set(VERSION_DESCRIBE v1.1.54352-testing)
set(VERSION_REVISION 54352)
set(VERSION_DESCRIBE v1.1.54355-testing)
set(VERSION_REVISION 54355)
# end of autochange
set (VERSION_MAJOR 1)

View File

@ -164,8 +164,7 @@ public:
{
const auto cond_arg = arguments[i].get();
if (!typeid_cast<const DataTypeUInt8 *>(cond_arg))
throw Exception{
"Illegal type " + cond_arg->getName() + " of argument " + toString(i + 1) +
throw Exception{"Illegal type " + cond_arg->getName() + " of argument " + toString(i + 1) +
" of aggregate function " + derived().getName() + ", must be UInt8",
ErrorCodes::ILLEGAL_TYPE_OF_ARGUMENT};
}

View File

@ -3,6 +3,7 @@
#include <sys/utsname.h>
#include <cerrno>
#include <cstring>
#include <algorithm>
#include <iostream>
#include <functional>
#include <Poco/DOM/Text.h>
@ -356,7 +357,7 @@ void ConfigProcessor::doIncludesRecursive(
ConfigProcessor::Files ConfigProcessor::getConfigMergeFiles(const std::string & config_path)
{
Files res;
Files files;
Poco::Path merge_dir_path(config_path);
merge_dir_path.setExtension("d");
@ -378,12 +379,14 @@ ConfigProcessor::Files ConfigProcessor::getConfigMergeFiles(const std::string &
Poco::File & file = *it;
if (file.isFile() && (endsWith(file.path(), ".xml") || endsWith(file.path(), ".conf")))
{
res.push_back(file.path());
files.push_back(file.path());
}
}
}
return res;
std::sort(files.begin(), files.end());
return files;
}
XMLDocumentPtr ConfigProcessor::processConfig(

View File

@ -2,6 +2,7 @@
#include <string>
#include <unordered_set>
#include <vector>
#include <Poco/DOM/Document.h>
#include <Poco/DOM/DOMParser.h>
@ -87,7 +88,7 @@ public:
void savePreprocessedConfig(const LoadedConfig & loaded_config);
public:
using Files = std::list<std::string>;
using Files = std::vector<std::string>;
static Files getConfigMergeFiles(const std::string & config_path);

View File

@ -367,6 +367,8 @@ namespace ErrorCodes
extern const int CANNOT_ASSIGN_OPTIMIZE = 388;
extern const int INSERT_WAS_DEDUPLICATED = 389;
extern const int CANNOT_GET_CREATE_TABLE_QUERY = 390;
extern const int EXTERNAL_LIBRARY_ERROR = 391;
extern const int KEEPER_EXCEPTION = 999;
extern const int POCO_EXCEPTION = 1000;

View File

@ -40,9 +40,6 @@ struct UInt128
bool inline operator> (const UInt128 rhs) const { return tuple() > rhs.tuple(); }
bool inline operator>= (const UInt128 rhs) const { return tuple() >= rhs.tuple(); }
/** Types who are stored at the moment in the database have no more than 64bits and can be handle
* inside an unique UInt64.
*/
template <typename T> bool inline operator== (const T rhs) const { return *this == UInt128(rhs); }
template <typename T> bool inline operator!= (const T rhs) const { return *this != UInt128(rhs); }
template <typename T> bool inline operator>= (const T rhs) const { return *this >= UInt128(rhs); }

View File

@ -59,7 +59,7 @@
#define DBMS_MIN_REVISION_WITH_SERVER_TIMEZONE 54058
#define DBMS_MIN_REVISION_WITH_QUOTA_KEY_IN_CLIENT_INFO 54060
#define DBMS_MIN_REVISION_WITH_TABLES_STATUS 54226
#define DBMS_MIN_REVISION_WITH_TIME_ZONE_PARAMETER_IN_DATETIME_DATA_TYPE 54311
#define DBMS_MIN_REVISION_WITH_TIME_ZONE_PARAMETER_IN_DATETIME_DATA_TYPE 54337
/// Version of ClickHouse TCP protocol. Set to git tag with latest protocol change.
#define DBMS_TCP_PROTOCOL_VERSION 54226

View File

@ -29,6 +29,7 @@ STRONG_TYPEDEF(TupleBackend, Tuple); /// Array and Tuple are different types wit
/** 32 is enough. Round number is used for alignment and for better arithmetic inside std::vector.
* NOTE: Actually, sizeof(std::string) is 32 when using libc++, so Field is 40 bytes.
*/
#define DBMS_MIN_FIELD_SIZE 32

View File

@ -18,7 +18,7 @@ namespace DB
std::ostream & operator<<(std::ostream & stream, const IBlockInputStream & what)
{
stream << "IBlockInputStream(id = " << what.getID() << ", name = " << what.getName() << ")";
stream << "IBlockInputStream(name = " << what.getName() << ")";
//what.dumpTree(stream); // todo: set const
return stream;
}
@ -115,7 +115,6 @@ std::ostream & operator<<(std::ostream & stream, const Connection::Packet & what
std::ostream & operator<<(std::ostream & stream, const SubqueryForSet & what)
{
stream << "SubqueryForSet(source = " << what.source
<< ", source_sample = " << what.source_sample
// TODO: << ", set = " << what.set << ", join = " << what.join
<< ", table = " << what.table
<< ")";

View File

@ -24,11 +24,11 @@ public:
String getName() const override { return "AddingConstColumn"; }
String getID() const override
Block getHeader() const override
{
std::stringstream res;
res << "AddingConstColumn(" << children.back()->getID() << ")";
return res.str();
Block res = children.back()->getHeader();
res.insert({data_type->createColumn(), data_type, column_name});
return res;
}
protected:

View File

@ -14,6 +14,11 @@ namespace ProfileEvents
namespace DB
{
Block AggregatingBlockInputStream::getHeader() const
{
return aggregator.getHeader(final);
}
Block AggregatingBlockInputStream::readImpl()
{
@ -42,7 +47,7 @@ Block AggregatingBlockInputStream::readImpl()
if (!isCancelled())
{
/// Flush data in the RAM to disk also. It's easier.
/// Flush data in the RAM to disk also. It's easier than merging on-disk and RAM data.
if (data_variants->size())
aggregator.writeToTemporaryFile(*data_variants);
}
@ -63,9 +68,8 @@ Block AggregatingBlockInputStream::readImpl()
}
}
Block res;
if (isCancelled() || !impl)
return res;
return {};
return impl->read();
}

View File

@ -30,12 +30,7 @@ public:
String getName() const override { return "Aggregating"; }
String getID() const override
{
std::stringstream res;
res << "Aggregating(" << children.back()->getID() << ", " << aggregator.getID() << ")";
return res.str();
}
Block getHeader() const override;
protected:
Block readImpl() override;

View File

@ -28,26 +28,8 @@ public:
String getName() const override { return "AggregatingSorted"; }
String getID() const override
{
std::stringstream res;
res << "AggregatingSorted(inputs";
for (size_t i = 0; i < children.size(); ++i)
res << ", " << children[i]->getID();
res << ", description";
for (size_t i = 0; i < description.size(); ++i)
res << ", " << description[i].getID();
res << ")";
return res.str();
}
bool isGroupedOutput() const override { return true; }
bool isSortedOutput() const override { return true; }
const SortDescription & getSortDescription() const override { return description; }
protected:
/// Can return 1 more records than max_block_size.

View File

@ -35,13 +35,6 @@ public:
String getName() const override { return "Asynchronous"; }
String getID() const override
{
std::stringstream res;
res << "Asynchronous(" << children.back()->getID() << ")";
return res.str();
}
void readPrefix() override
{
/// Do not call `readPrefix` on the child, so that the corresponding actions are performed in a separate thread.
@ -80,6 +73,9 @@ public:
}
Block getHeader() const override { return children.at(0)->getHeader(); }
~AsynchronousBlockInputStream() override
{
if (started)

View File

@ -5,7 +5,7 @@
namespace DB
{
/** Adds to one thread additional block information that is specified
/** Adds to one stream additional block information that is specified
* as the constructor parameter.
*/
class BlockExtraInfoInputStream : public IProfilingBlockInputStream
@ -24,12 +24,7 @@ public:
String getName() const override { return "BlockExtraInfoInput"; }
String getID() const override
{
std::stringstream res;
res << "BlockExtraInfoInput(" << children.back()->getID() << ")";
return res.str();
}
Block getHeader() const override { return children.back()->getHeader(); }
protected:
Block readImpl() override

View File

@ -21,7 +21,6 @@ struct BlockIO
BlockInputStreamPtr in;
BlockOutputStreamPtr out;
Block in_sample; /// Example of a block to be read from `in`.
Block out_sample; /// Example of a block to be written to `out`.
/// Callbacks for query logging could be set here.
@ -51,7 +50,6 @@ struct BlockIO
process_list_entry = rhs.process_list_entry;
in = rhs.in;
out = rhs.out;
in_sample = rhs.in_sample;
out_sample = rhs.out_sample;
finish_callback = rhs.finish_callback;

View File

@ -29,15 +29,10 @@ public:
String getName() const override { return "BlockInputStreamFromRowInputStream"; }
String getID() const override
{
std::stringstream res;
res << this;
return res.str();
}
RowInputStreamPtr & getRowInput() { return row_input; }
Block getHeader() const override { return sample; }
protected:
Block readImpl() override;

View File

@ -22,13 +22,6 @@ public:
String getName() const override { return "BlocksList"; }
String getID() const override
{
std::stringstream res;
res << this;
return res.str();
}
protected:
Block readImpl() override
{

View File

@ -20,11 +20,6 @@ String CastTypeBlockInputStream::getName() const
return "CastType";
}
String CastTypeBlockInputStream::getID() const
{
return "CastType(" + children.back()->getID() + ")";
}
Block CastTypeBlockInputStream::readImpl()
{
Block block = children.back()->read();

View File

@ -17,7 +17,7 @@ public:
String getName() const override;
String getID() const override;
Block getHeader() const override { return ref_definition; }
protected:
Block readImpl() override;

View File

@ -28,26 +28,11 @@ public:
String getName() const override { return "CollapsingFinal"; }
String getID() const override
{
std::stringstream res;
res << "CollapsingFinal(inputs";
for (size_t i = 0; i < children.size(); ++i)
res << ", " << children[i]->getID();
res << ", description";
for (size_t i = 0; i < description.size(); ++i)
res << ", " << description[i].getID();
res << ", sign_column, " << sign_column_name << ")";
return res.str();
}
bool isSortedOutput() const override { return true; }
const SortDescription & getSortDescription() const override { return description; }
Block getHeader() const override { return children.at(0)->getHeader(); }
protected:
Block readImpl() override;

View File

@ -33,23 +33,6 @@ public:
String getName() const override { return "CollapsingSorted"; }
String getID() const override
{
std::stringstream res;
res << "CollapsingSorted(inputs";
for (size_t i = 0; i < children.size(); ++i)
res << ", " << children[i]->getID();
res << ", description";
for (size_t i = 0; i < description.size(); ++i)
res << ", " << description[i].getID();
res << ", sign_column, " << sign_column << ")";
return res.str();
}
protected:
/// Can return 1 more records than max_block_size.
Block readImpl() override;

View File

@ -30,19 +30,6 @@ ColumnGathererStream::ColumnGathererStream(
}
String ColumnGathererStream::getID() const
{
std::stringstream res;
res << getName() << "(";
for (size_t i = 0; i < children.size(); ++i)
res << (i == 0 ? "" : ", " ) << children[i]->getID();
res << ")";
return res.str();
}
void ColumnGathererStream::init()
{
sources.reserve(children.size());
@ -107,13 +94,13 @@ void ColumnGathererStream::fetchNewBlock(Source & source, size_t source_num)
}
catch (Exception & e)
{
e.addMessage("Cannot fetch required block. Stream " + children[source_num]->getID() + ", part " + toString(source_num));
e.addMessage("Cannot fetch required block. Stream " + children[source_num]->getName() + ", part " + toString(source_num));
throw;
}
if (0 == source.size)
{
throw Exception("Fetched block is empty. Stream " + children[source_num]->getID() + ", part " + toString(source_num),
throw Exception("Fetched block is empty. Stream " + children[source_num]->getName() + ", part " + toString(source_num),
ErrorCodes::RECEIVED_EMPTY_DATA);
}
}

View File

@ -61,12 +61,12 @@ public:
String getName() const override { return "ColumnGatherer"; }
String getID() const override;
Block readImpl() override;
void readSuffixImpl() override;
Block getHeader() const override { return children.at(0)->getHeader(); }
/// for use in implementations of IColumn::gather()
template <typename Column>
void gather(Column & column_res);

View File

@ -22,24 +22,7 @@ public:
String getName() const override { return "Concat"; }
String getID() const override
{
std::stringstream res;
res << "Concat(";
Strings children_ids(children.size());
for (size_t i = 0; i < children.size(); ++i)
children_ids[i] = children[i]->getID();
/// Let's assume that the order of concatenation of blocks does not matter.
std::sort(children_ids.begin(), children_ids.end());
for (size_t i = 0; i < children_ids.size(); ++i)
res << (i == 0 ? "" : ", ") << children_ids[i];
res << ")";
return res.str();
}
Block getHeader() const override { return children.at(0)->getHeader(); }
protected:
Block readImpl() override

View File

@ -35,24 +35,7 @@ public:
String getName() const override { return "CreatingSets"; }
String getID() const override
{
std::stringstream res;
res << "CreatingSets(";
Strings children_ids(children.size());
for (size_t i = 0; i < children.size(); ++i)
children_ids[i] = children[i]->getID();
/// Let's assume that the order of creating sets does not matter.
std::sort(children_ids.begin(), children_ids.end() - 1);
for (size_t i = 0; i < children_ids.size(); ++i)
res << (i == 0 ? "" : ", ") << children_ids[i];
res << ")";
return res.str();
}
Block getHeader() const override { return children.back()->getHeader(); }
/// Takes `totals` only from the main source, not from subquery sources.
const Block & getTotals() override;

View File

@ -18,13 +18,6 @@ DistinctBlockInputStream::DistinctBlockInputStream(const BlockInputStreamPtr & i
children.push_back(input);
}
String DistinctBlockInputStream::getID() const
{
std::stringstream res;
res << "Distinct(" << children.back()->getID() << ")";
return res.str();
}
Block DistinctBlockInputStream::readImpl()
{
/// Execute until end of stream or until

View File

@ -22,7 +22,7 @@ public:
String getName() const override { return "Distinct"; }
String getID() const override;
Block getHeader() const override { return children.at(0)->getHeader(); }
protected:
Block readImpl() override;

View File

@ -19,13 +19,6 @@ DistinctSortedBlockInputStream::DistinctSortedBlockInputStream(const BlockInputS
children.push_back(input);
}
String DistinctSortedBlockInputStream::getID() const
{
std::stringstream res;
res << "DistinctSorted(" << children.back()->getID() << ")";
return res.str();
}
Block DistinctSortedBlockInputStream::readImpl()
{
/// Execute until end of stream or until

View File

@ -25,7 +25,7 @@ public:
String getName() const override { return "DistinctSorted"; }
String getID() const override;
Block getHeader() const override { return children.at(0)->getHeader(); }
protected:
Block readImpl() override;

View File

@ -13,13 +13,6 @@ ExpressionBlockInputStream::ExpressionBlockInputStream(const BlockInputStreamPtr
String ExpressionBlockInputStream::getName() const { return "Expression"; }
String ExpressionBlockInputStream::getID() const
{
std::stringstream res;
res << "Expression(" << children.back()->getID() << ", " << expression->getID() << ")";
return res.str();
}
const Block & ExpressionBlockInputStream::getTotals()
{
if (IProfilingBlockInputStream * child = dynamic_cast<IProfilingBlockInputStream *>(&*children.back()))
@ -31,14 +24,19 @@ const Block & ExpressionBlockInputStream::getTotals()
return totals;
}
Block ExpressionBlockInputStream::getHeader() const
{
Block res = children.back()->getHeader();
expression->execute(res);
return res;
}
Block ExpressionBlockInputStream::readImpl()
{
Block res = children.back()->read();
if (!res)
return res;
expression->execute(res);
return res;
}

View File

@ -22,8 +22,8 @@ public:
ExpressionBlockInputStream(const BlockInputStreamPtr & input, const ExpressionActionsPtr & expression_);
String getName() const override;
String getID() const override;
const Block & getTotals() override;
Block getHeader() const override;
protected:
Block readImpl() override;

View File

@ -23,24 +23,36 @@ FilterBlockInputStream::FilterBlockInputStream(const BlockInputStreamPtr & input
children.push_back(input);
}
FilterBlockInputStream::FilterBlockInputStream(const BlockInputStreamPtr & input, const ExpressionActionsPtr & expression_, const String & filter_column_name_)
: expression(expression_), filter_column(-1), filter_column_name(filter_column_name_)
FilterBlockInputStream::FilterBlockInputStream(const BlockInputStreamPtr & input, const ExpressionActionsPtr & expression_, const String & filter_column_name)
: expression(expression_)
{
children.push_back(input);
/// Determine position of filter column.
header = input->getHeader();
expression->execute(header);
filter_column = header.getPositionByName(filter_column_name);
/// Isn't the filter already constant?
ColumnPtr column = header.safeGetByPosition(filter_column).column;
if (column)
constant_filter_description = ConstantFilterDescription(*column);
if (!constant_filter_description.always_false
&& !constant_filter_description.always_true)
{
/// Replace the filter column to a constant with value 1.
auto & header_filter_elem = header.getByPosition(filter_column);
header_filter_elem.column = header_filter_elem.type->createColumnConst(header.rows(), UInt64(1));
}
}
String FilterBlockInputStream::getName() const { return "Filter"; }
String FilterBlockInputStream::getID() const
{
std::stringstream res;
res << "Filter(" << children.back()->getID() << ", " << expression->getID() << ", " << filter_column << ", " << filter_column_name << ")";
return res.str();
}
const Block & FilterBlockInputStream::getTotals()
{
if (IProfilingBlockInputStream * child = dynamic_cast<IProfilingBlockInputStream *>(&*children.back()))
@ -53,37 +65,18 @@ const Block & FilterBlockInputStream::getTotals()
}
Block FilterBlockInputStream::getHeader() const
{
return header;
}
Block FilterBlockInputStream::readImpl()
{
Block res;
if (is_first)
{
is_first = false;
const Block & sample_block = expression->getSampleBlock();
/// Find the current position of the filter column in the block.
/** sample_block has the result structure of evaluating the expression.
* But this structure does not necessarily match expression->execute(res) below,
* because the expression can be applied to a block that also contains additional,
* columns unnecessary for this expression, but needed later, in the next stages of the query execution pipeline.
* There will be no such columns in sample_block.
* Therefore, the position of the filter column in it can be different.
*/
ssize_t filter_column_in_sample_block = filter_column;
if (filter_column_in_sample_block == -1)
filter_column_in_sample_block = sample_block.getPositionByName(filter_column_name);
/// Let's check if the filter column is a constant containing 0 or 1.
ColumnPtr column = sample_block.safeGetByPosition(filter_column_in_sample_block).column;
if (column)
constant_filter_description = ConstantFilterDescription(*column);
if (constant_filter_description.always_false)
return res;
}
if (constant_filter_description.always_false)
return res;
/// Until non-empty block after filtering or end of stream.
while (1)
@ -97,10 +90,6 @@ Block FilterBlockInputStream::readImpl()
if (constant_filter_description.always_true)
return res;
/// Find the current position of the filter column in the block.
if (filter_column == -1)
filter_column = res.getPositionByName(filter_column_name);
size_t columns = res.columns();
ColumnPtr column = res.safeGetByPosition(filter_column).column;

View File

@ -25,18 +25,16 @@ public:
FilterBlockInputStream(const BlockInputStreamPtr & input, const ExpressionActionsPtr & expression_, const String & filter_column_name_);
String getName() const override;
String getID() const override;
const Block & getTotals() override;
Block getHeader() const override;
protected:
Block readImpl() override;
private:
ExpressionActionsPtr expression;
Block header;
ssize_t filter_column;
String filter_column_name;
bool is_first = true;
ConstantFilterDescription constant_filter_description;
};

View File

@ -3,16 +3,16 @@
namespace DB
{
String FilterColumnsBlockInputStream::getID() const
Block FilterColumnsBlockInputStream::getHeader() const
{
std::stringstream res;
res << "FilterColumnsBlockInputStream(" << children.back()->getID();
Block block = children.back()->getHeader();
Block filtered;
for (const auto & it : columns_to_save)
res << ", " << it;
if (throw_if_column_not_found || block.has(it))
filtered.insert(std::move(block.getByName(it)));
res << ")";
return res.str();
return filtered;
}
Block FilterColumnsBlockInputStream::readImpl()

View File

@ -24,7 +24,7 @@ public:
return "FilterColumnsBlockInputStream";
}
String getID() const override;
Block getHeader() const override;
protected:
Block readImpl() override;

View File

@ -59,7 +59,7 @@ BlockInputStreamPtr FormatFactory::getInput(const String & name, ReadBuffer & bu
if (name == "Native")
{
return std::make_shared<NativeBlockInputStream>(buf);
return std::make_shared<NativeBlockInputStream>(buf, sample, 0);
}
else if (name == "RowBinary")
{

View File

@ -135,23 +135,6 @@ public:
String getName() const override { return "GraphiteRollupSorted"; }
String getID() const override
{
std::stringstream res;
res << "GraphiteRollupSorted(inputs";
for (size_t i = 0; i < children.size(); ++i)
res << ", " << children[i]->getID();
res << ", description";
for (size_t i = 0; i < description.size(); ++i)
res << ", " << description[i].getID();
res << ")";
return res.str();
}
~GraphiteRollupSortedBlockInputStream()
{
if (aggregate_state_created)

View File

@ -64,6 +64,7 @@ void IBlockInputStream::dumpTree(std::ostream & ostr, size_t indent, size_t mult
ostr << String(indent, ' ') << getName();
if (multiplier > 1)
ostr << " × " << multiplier;
// ostr << ": " << getHeader().dumpStructure();
ostr << std::endl;
++indent;
@ -125,13 +126,5 @@ void IBlockInputStream::getLeavesImpl(BlockInputStreams & res, const BlockInputS
(*it)->getLeavesImpl(res, *it);
}
/// By default all instances is different streams
String IBlockInputStream::getID() const
{
std::stringstream res;
res << getName() << "(" << this << ")";
return res.str();
};
}

View File

@ -48,6 +48,12 @@ class IBlockInputStream : private boost::noncopyable
public:
IBlockInputStream() {}
/** Get data structure of the stream in a form of "header" block (it is also called "sample block").
* Header block contains column names, data types, columns of size 0. Constant columns must have corresponding values.
* It is guaranteed that method "read" returns blocks of exactly that structure.
*/
virtual Block getHeader() const = 0;
/** Read next block.
* If there are no more blocks, return an empty block (for which operator `bool` returns false).
* NOTE: Only one thread can read from one instance of IBlockInputStream simultaneously.
@ -76,14 +82,6 @@ public:
*/
virtual String getName() const = 0;
/** The unique identifier of the pipeline part of the query execution.
* Sources with the same identifier are considered identical
* (producing the same data), and can be replaced by one source
* if several queries are executed simultaneously.
* If the source can not be glued together with any other - return the object's address as an identifier.
*/
virtual String getID() const;
/// If this stream generates data in grouped by some keys, return true.
virtual bool isGroupedOutput() const { return false; }
/// If this stream generates data in order by some keys, return true.

View File

@ -1,20 +1,16 @@
#pragma once
#include <Parsers/ASTInsertQuery.h>
#include <Interpreters/Context.h>
#include <IO/ConcatReadBuffer.h>
#include <Parsers/IAST.h>
#include <DataStreams/IProfilingBlockInputStream.h>
#include <DataStreams/BlockIO.h>
#include <cstddef>
#include <memory>
namespace DB
{
namespace ErrorCodes
{
extern const int LOGICAL_ERROR;
}
struct BlockIO;
class Context;
/** Prepares an input stream which produce data containing in INSERT query
* Head of inserting data could be stored in INSERT ast directly
@ -23,7 +19,6 @@ namespace ErrorCodes
class InputStreamFromASTInsertQuery : public IProfilingBlockInputStream
{
public:
InputStreamFromASTInsertQuery(const ASTPtr & ast, ReadBuffer & input_buffer_tail_part, const BlockIO & streams, Context & context);
Block readImpl() override { return res_stream->read(); }
@ -31,10 +26,10 @@ public:
void readSuffixImpl() override { return res_stream->readSuffix(); }
String getName() const override { return "InputStreamFromASTInsertQuery"; }
String getID() const override { return "InputStreamFromASTInsertQuery(" + toString(std::intptr_t(this)) + ")"; }
Block getHeader() const override { return res_stream->getHeader(); }
private:
std::unique_ptr<ReadBuffer> input_buffer_ast_part;
std::unique_ptr<ReadBuffer> input_buffer_contacenated;

View File

@ -15,14 +15,13 @@ class LazyBlockInputStream : public IProfilingBlockInputStream
public:
using Generator = std::function<BlockInputStreamPtr()>;
LazyBlockInputStream(Generator generator_)
: generator(std::move(generator_))
LazyBlockInputStream(const Block & header_, Generator generator_)
: header(header_), generator(std::move(generator_))
{
}
LazyBlockInputStream(const char * name_, Generator generator_)
: name(name_)
, generator(std::move(generator_))
LazyBlockInputStream(const char * name_, const Block & header_, Generator generator_)
: name(name_), header(header_), generator(std::move(generator_))
{
}
@ -34,6 +33,11 @@ public:
IProfilingBlockInputStream::cancel();
}
Block getHeader() const override
{
return header;
}
protected:
Block readImpl() override
{
@ -89,6 +93,7 @@ protected:
private:
const char * name = "Lazy";
Block header;
Generator generator;
BlockInputStreamPtr input;

View File

@ -21,12 +21,7 @@ public:
String getName() const override { return "Limit"; }
String getID() const override
{
std::stringstream res;
res << "Limit(" << children.back()->getID() << ", " << limit << ", " << offset << ")";
return res.str();
}
Block getHeader() const override { return children.at(0)->getHeader(); }
protected:
Block readImpl() override;

View File

@ -3,9 +3,9 @@
#include <DataStreams/IProfilingBlockInputStream.h>
#include <Common/HashTable/HashMap.h>
#include <Common/SipHash.h>
#include <Common/UInt128.h>
namespace DB
{
@ -22,6 +22,8 @@ public:
String getName() const override { return "LimitBy"; }
Block getHeader() const override { return children.at(0)->getHeader(); }
protected:
Block readImpl() override;

View File

@ -15,11 +15,9 @@ String MaterializingBlockInputStream::getName() const
return "Materializing";
}
String MaterializingBlockInputStream::getID() const
Block MaterializingBlockInputStream::getHeader() const
{
std::stringstream res;
res << "Materializing(" << children.back()->getID() << ")";
return res.str();
return materializeBlock(children.back()->getHeader());
}
Block MaterializingBlockInputStream::readImpl()

View File

@ -12,7 +12,7 @@ class MaterializingBlockInputStream : public IProfilingBlockInputStream
public:
MaterializingBlockInputStream(const BlockInputStreamPtr & input);
String getName() const override;
String getID() const override;
Block getHeader() const override;
protected:
Block readImpl() override;

View File

@ -155,7 +155,7 @@ Block MergeSortingBlockInputStream::readImpl()
MergeSortingBlocksBlockInputStream::MergeSortingBlocksBlockInputStream(
Blocks & blocks_, SortDescription & description_, size_t max_merged_block_size_, size_t limit_)
: blocks(blocks_), description(description_), max_merged_block_size(max_merged_block_size_), limit(limit_)
: blocks(blocks_), header(blocks.at(0).cloneEmpty()), description(description_), max_merged_block_size(max_merged_block_size_), limit(limit_)
{
Blocks nonempty_blocks;
for (const auto & block : blocks)

View File

@ -33,17 +33,19 @@ public:
size_t max_merged_block_size_, size_t limit_ = 0);
String getName() const override { return "MergeSortingBlocks"; }
String getID() const override { return getName(); }
bool isGroupedOutput() const override { return true; }
bool isSortedOutput() const override { return true; }
const SortDescription & getSortDescription() const override { return description; }
Block getHeader() const override { return header; }
protected:
Block readImpl() override;
private:
Blocks & blocks;
Block header;
SortDescription description;
size_t max_merged_block_size;
size_t limit;
@ -80,22 +82,12 @@ public:
String getName() const override { return "MergeSorting"; }
String getID() const override
{
std::stringstream res;
res << "MergeSorting(" << children.back()->getID();
for (size_t i = 0; i < description.size(); ++i)
res << ", " << description[i].getID();
res << ")";
return res.str();
}
bool isGroupedOutput() const override { return true; }
bool isSortedOutput() const override { return true; }
const SortDescription & getSortDescription() const override { return description; }
Block getHeader() const override { return children.at(0)->getHeader(); }
protected:
Block readImpl() override;
@ -129,7 +121,7 @@ private:
BlockInputStreamPtr block_in;
TemporaryFileStream(const std::string & path)
: file_in(path), compressed_in(file_in), block_in(std::make_shared<NativeBlockInputStream>(compressed_in)) {}
: file_in(path), compressed_in(file_in), block_in(std::make_shared<NativeBlockInputStream>(compressed_in, 0)) {}
};
std::vector<std::unique_ptr<TemporaryFileStream>> temporary_inputs;

View File

@ -6,6 +6,11 @@
namespace DB
{
Block MergingAggregatedBlockInputStream::getHeader() const
{
return aggregator.getHeader(final);
}
Block MergingAggregatedBlockInputStream::readImpl()
{

View File

@ -22,12 +22,7 @@ public:
String getName() const override { return "MergingAggregated"; }
String getID() const override
{
std::stringstream res;
res << "MergingAggregated(" << children.back()->getID() << ", " << aggregator.getID() << ")";
return res.str();
}
Block getHeader() const override;
protected:
Block readImpl() override;

View File

@ -90,14 +90,9 @@ MergingAggregatedMemoryEfficientBlockInputStream::MergingAggregatedMemoryEfficie
}
String MergingAggregatedMemoryEfficientBlockInputStream::getID() const
Block MergingAggregatedMemoryEfficientBlockInputStream::getHeader() const
{
std::stringstream res;
res << "MergingAggregatedMemoryEfficient(" << aggregator.getID();
for (size_t i = 0, size = children.size(); i < size; ++i)
res << ", " << children.back()->getID();
res << ")";
return res.str();
return aggregator.getHeader(final);
}

View File

@ -67,8 +67,6 @@ public:
String getName() const override { return "MergingAggregatedMemoryEfficient"; }
String getID() const override;
/// Sends the request (initiates calculations) earlier than `read`.
void readPrefix() override;
@ -80,6 +78,8 @@ public:
*/
void cancel() override;
Block getHeader() const override;
protected:
Block readImpl() override;

View File

@ -24,28 +24,6 @@ MergingSortedBlockInputStream::MergingSortedBlockInputStream(
children.insert(children.end(), inputs_.begin(), inputs_.end());
}
String MergingSortedBlockInputStream::getID() const
{
std::stringstream res;
res << "MergingSorted(";
Strings children_ids(children.size());
for (size_t i = 0; i < children.size(); ++i)
children_ids[i] = children[i]->getID();
/// The order does not matter.
std::sort(children_ids.begin(), children_ids.end());
for (size_t i = 0; i < children_ids.size(); ++i)
res << (i == 0 ? "" : ", ") << children_ids[i];
for (size_t i = 0; i < description.size(); ++i)
res << ", " << description[i].getID();
res << ")";
return res.str();
}
void MergingSortedBlockInputStream::init(Block & header, MutableColumns & merged_columns)
{
/// Read the first blocks, initialize the queue.

View File

@ -70,12 +70,12 @@ public:
String getName() const override { return "MergingSorted"; }
String getID() const override;
bool isGroupedOutput() const override { return true; }
bool isSortedOutput() const override { return true; }
const SortDescription & getSortDescription() const override { return description; }
Block getHeader() const override { return children.at(0)->getHeader(); }
protected:
struct RowRef
{

View File

@ -19,23 +19,40 @@ namespace ErrorCodes
extern const int INCORRECT_INDEX;
extern const int LOGICAL_ERROR;
extern const int CANNOT_READ_ALL_DATA;
extern const int NOT_IMPLEMENTED;
}
NativeBlockInputStream::NativeBlockInputStream(
ReadBuffer & istr_, UInt64 server_revision_,
bool use_index_,
NativeBlockInputStream::NativeBlockInputStream(ReadBuffer & istr_, UInt64 server_revision_)
: istr(istr_), server_revision(server_revision_)
{
}
NativeBlockInputStream::NativeBlockInputStream(ReadBuffer & istr_, const Block & header_, UInt64 server_revision_)
: istr(istr_), header(header_), server_revision(server_revision_)
{
}
NativeBlockInputStream::NativeBlockInputStream(ReadBuffer & istr_, UInt64 server_revision_,
IndexForNativeFormat::Blocks::const_iterator index_block_it_,
IndexForNativeFormat::Blocks::const_iterator index_block_end_)
: istr(istr_), server_revision(server_revision_),
use_index(use_index_), index_block_it(index_block_it_), index_block_end(index_block_end_)
use_index(true), index_block_it(index_block_it_), index_block_end(index_block_end_)
{
if (use_index)
{
istr_concrete = typeid_cast<CompressedReadBufferFromFile *>(&istr);
if (!istr_concrete)
throw Exception("When need to use index for NativeBlockInputStream, istr must be CompressedReadBufferFromFile.", ErrorCodes::LOGICAL_ERROR);
istr_concrete = typeid_cast<CompressedReadBufferFromFile *>(&istr);
if (!istr_concrete)
throw Exception("When need to use index for NativeBlockInputStream, istr must be CompressedReadBufferFromFile.", ErrorCodes::LOGICAL_ERROR);
index_column_it = index_block_it->columns.begin();
if (index_block_it == index_block_end)
return;
index_column_it = index_block_it->columns.begin();
/// Initialize header from the index.
for (const auto & column : index_block_it->columns)
{
auto type = DataTypeFactory::instance().get(column.type);
header.insert({ type->createColumn(), type, column.name });
}
}
@ -50,6 +67,12 @@ void NativeBlockInputStream::readData(const IDataType & type, IColumn & column,
}
Block NativeBlockInputStream::getHeader() const
{
return header;
}
Block NativeBlockInputStream::readImpl()
{
Block res;

View File

@ -60,35 +60,33 @@ struct IndexForNativeFormat
class NativeBlockInputStream : public IProfilingBlockInputStream
{
public:
/** If a non-zero server_revision is specified, additional block information may be expected and read.
*
* `index` is not required parameter. If set, only parts of columns specified in the index will be read.
*/
NativeBlockInputStream(
ReadBuffer & istr_, UInt64 server_revision_ = 0,
bool use_index_ = false,
IndexForNativeFormat::Blocks::const_iterator index_block_it_ = IndexForNativeFormat::Blocks::const_iterator{},
IndexForNativeFormat::Blocks::const_iterator index_block_end_ = IndexForNativeFormat::Blocks::const_iterator{});
/// If a non-zero server_revision is specified, additional block information may be expected and read.
NativeBlockInputStream(ReadBuffer & istr_, UInt64 server_revision_);
/// For cases when data structure (header) is known in advance.
/// NOTE We may use header for data validation and/or type conversions. It is not implemented.
NativeBlockInputStream(ReadBuffer & istr_, const Block & header_, UInt64 server_revision_);
/// For cases when we have an index. It allows to skip columns. Only columns specified in the index will be read.
NativeBlockInputStream(ReadBuffer & istr_, UInt64 server_revision_,
IndexForNativeFormat::Blocks::const_iterator index_block_it_,
IndexForNativeFormat::Blocks::const_iterator index_block_end_);
String getName() const override { return "Native"; }
String getID() const override
{
std::stringstream res;
res << this;
return res.str();
}
static void readData(const IDataType & type, IColumn & column, ReadBuffer & istr, size_t rows, double avg_value_size_hint);
Block getHeader() const override;
protected:
Block readImpl() override;
private:
ReadBuffer & istr;
Block header;
UInt64 server_revision;
bool use_index;
bool use_index = false;
IndexForNativeFormat::Blocks::const_iterator index_block_it;
IndexForNativeFormat::Blocks::const_iterator index_block_end;
IndexOfBlockForNativeFormat::Columns::const_iterator index_column_it;

View File

@ -28,12 +28,7 @@ public:
String getName() const override { return "NullAndDoCopy"; }
String getID() const override
{
std::stringstream res;
res << "copy from " << input->getID();
return res.str();
}
Block getHeader() const override { return {}; }
protected:
Block readImpl() override

View File

@ -6,14 +6,19 @@
namespace DB
{
/** Empty stream of blocks.
/** Empty stream of blocks of specified structure.
*/
class NullBlockInputStream : public IBlockInputStream
{
public:
Block read() override { return Block(); }
NullBlockInputStream(const Block & header) : header(header) {}
Block read() override { return {}; }
Block getHeader() const override { return header; }
String getName() const override { return "Null"; }
private:
Block header;
};
}

View File

@ -17,18 +17,12 @@ namespace ErrorCodes
NullableAdapterBlockInputStream::NullableAdapterBlockInputStream(
const BlockInputStreamPtr & input,
const Block & in_sample_, const Block & out_sample_)
: header(out_sample_)
{
buildActions(in_sample_, out_sample_);
children.push_back(input);
}
String NullableAdapterBlockInputStream::getID() const
{
std::stringstream res;
res << "NullableAdapterBlockInputStream(" << children.back()->getID() << ")";
return res.str();
}
Block NullableAdapterBlockInputStream::readImpl()
{
Block block = children.back()->read();

View File

@ -18,12 +18,11 @@ namespace DB
class NullableAdapterBlockInputStream : public IProfilingBlockInputStream
{
public:
NullableAdapterBlockInputStream(const BlockInputStreamPtr & input, const Block & in_sample_,
const Block & out_sample_);
NullableAdapterBlockInputStream(const BlockInputStreamPtr & input, const Block & in_sample_, const Block & out_sample_);
String getName() const override { return "NullableAdapterBlockInputStream"; }
String getID() const override;
Block getHeader() const override { return header; }
protected:
Block readImpl() override;
@ -52,6 +51,7 @@ private:
void buildActions(const Block & in_sample, const Block & out_sample);
private:
Block header;
Actions actions;
std::vector<std::optional<String>> rename;
bool must_transform = false;

View File

@ -16,6 +16,14 @@ public:
String getName() const override { return "One"; }
Block getHeader() const override
{
Block res;
for (const auto & elem : block)
res.insert({ elem.column->cloneEmpty(), elem.type, elem.name });
return res;
}
protected:
Block readImpl() override
{

View File

@ -20,13 +20,13 @@ public:
children.push_back(stream);
}
Block getHeader() const override { return children.at(0)->getHeader(); }
private:
Block readImpl() override { return stream->read(); }
String getName() const override { return "Owning"; }
String getID() const override { return "Owning(" + stream->getID() + ")"; }
protected:
BlockInputStreamPtr stream;
std::unique_ptr<OwnType> own;

View File

@ -29,23 +29,9 @@ ParallelAggregatingBlockInputStream::ParallelAggregatingBlockInputStream(
}
String ParallelAggregatingBlockInputStream::getID() const
Block ParallelAggregatingBlockInputStream::getHeader() const
{
std::stringstream res;
res << "ParallelAggregating(";
Strings children_ids(children.size());
for (size_t i = 0; i < children.size(); ++i)
children_ids[i] = children[i]->getID();
/// Order does not matter.
std::sort(children_ids.begin(), children_ids.end());
for (size_t i = 0; i < children_ids.size(); ++i)
res << (i == 0 ? "" : ", ") << children_ids[i];
res << ", " << aggregator.getID() << ")";
return res.str();
return aggregator.getHeader(final);
}
@ -122,8 +108,7 @@ void ParallelAggregatingBlockInputStream::Handler::onBlock(Block & block, size_t
{
parent.aggregator.executeOnBlock(block, *parent.many_data[thread_num],
parent.threads_data[thread_num].key_columns, parent.threads_data[thread_num].aggregate_columns,
parent.threads_data[thread_num].key_sizes, parent.threads_data[thread_num].key,
parent.no_more_keys);
parent.threads_data[thread_num].key, parent.no_more_keys);
parent.threads_data[thread_num].src_rows += block.rows();
parent.threads_data[thread_num].src_bytes += block.bytes();
@ -212,6 +197,13 @@ void ParallelAggregatingBlockInputStream::execute()
<< "Total aggregated. " << total_src_rows << " rows (from " << total_src_bytes / 1048576.0 << " MiB)"
<< " in " << elapsed_seconds << " sec."
<< " (" << total_src_rows / elapsed_seconds << " rows/sec., " << total_src_bytes / elapsed_seconds / 1048576.0 << " MiB/sec.)");
/// If there was no data, and we aggregate without keys, we must return single row with the result of empty aggregation.
/// To do this, we pass a block with zero rows to aggregate.
if (total_src_rows == 0 && params.keys_size == 0 && !params.empty_result_for_aggregation_by_empty_set)
aggregator.executeOnBlock(children.at(0)->getHeader(), *many_data[0],
threads_data[0].key_columns, threads_data[0].aggregate_columns,
threads_data[0].key, no_more_keys);
}
}

View File

@ -27,10 +27,10 @@ public:
String getName() const override { return "ParallelAggregating"; }
String getID() const override;
void cancel() override;
Block getHeader() const override;
protected:
/// Do nothing that preparation to execution of the query be done in parallel, in ParallelInputsProcessor.
void readPrefix() override
@ -83,14 +83,12 @@ private:
StringRefs key;
ColumnRawPtrs key_columns;
Aggregator::AggregateColumns aggregate_columns;
Sizes key_sizes;
ThreadData(size_t keys_size, size_t aggregates_size)
{
key.resize(keys_size);
key_columns.resize(keys_size);
aggregate_columns.resize(aggregates_size);
key_sizes.resize(keys_size);
}
};

View File

@ -23,22 +23,12 @@ public:
String getName() const override { return "PartialSorting"; }
String getID() const override
{
std::stringstream res;
res << "PartialSorting(" << children.back()->getID();
for (size_t i = 0; i < description.size(); ++i)
res << ", " << description[i].getID();
res << ")";
return res.str();
}
bool isGroupedOutput() const override { return true; }
bool isSortedOutput() const override { return true; }
const SortDescription & getSortDescription() const override { return description; }
Block getHeader() const override { return children.at(0)->getHeader(); }
protected:
Block readImpl() override;

View File

@ -2,6 +2,7 @@
#include <DataStreams/OneBlockInputStream.h>
#include <Common/NetException.h>
#include <Interpreters/Context.h>
#include <Interpreters/castColumn.h>
#include <Storages/IStorage.h>
@ -17,9 +18,9 @@ namespace ErrorCodes
RemoteBlockInputStream::RemoteBlockInputStream(
Connection & connection,
const String & query_, const Context & context_, const Settings * settings,
const String & query_, const Block & header_, const Context & context_, const Settings * settings,
const ThrottlerPtr & throttler, const Tables & external_tables_, QueryProcessingStage::Enum stage_)
: query(query_), context(context_), external_tables(external_tables_), stage(stage_)
: header(header_), query(query_), context(context_), external_tables(external_tables_), stage(stage_)
{
if (settings)
context.setSettings(*settings);
@ -32,9 +33,9 @@ RemoteBlockInputStream::RemoteBlockInputStream(
RemoteBlockInputStream::RemoteBlockInputStream(
std::vector<IConnectionPool::Entry> && connections,
const String & query_, const Context & context_, const Settings * settings,
const String & query_, const Block & header_, const Context & context_, const Settings * settings,
const ThrottlerPtr & throttler, const Tables & external_tables_, QueryProcessingStage::Enum stage_)
: query(query_), context(context_), external_tables(external_tables_), stage(stage_)
: header(header_), query(query_), context(context_), external_tables(external_tables_), stage(stage_)
{
if (settings)
context.setSettings(*settings);
@ -48,9 +49,9 @@ RemoteBlockInputStream::RemoteBlockInputStream(
RemoteBlockInputStream::RemoteBlockInputStream(
const ConnectionPoolWithFailoverPtr & pool,
const String & query_, const Context & context_, const Settings * settings,
const String & query_, const Block & header_, const Context & context_, const Settings * settings,
const ThrottlerPtr & throttler, const Tables & external_tables_, QueryProcessingStage::Enum stage_)
: query(query_), context(context_), external_tables(external_tables_), stage(stage_)
: header(header_), query(query_), context(context_), external_tables(external_tables_), stage(stage_)
{
if (settings)
context.setSettings(*settings);
@ -148,6 +149,25 @@ void RemoteBlockInputStream::sendExternalTables()
multiplexed_connections->sendExternalTablesData(external_tables_data);
}
/** If we receive a block with slightly different column types, or with excessive columns,
* we will adapt it to expected structure.
*/
static Block adaptBlockStructure(const Block & block, const Block & header, const Context & context)
{
/// Special case when reader doesn't care about result structure. Deprecated and used only in Benchmark, PerformanceTest.
if (!header)
return block;
Block res;
res.info = block.info;
for (const auto & elem : header)
res.insert({ castColumn(block.getByName(elem.name), elem.type, context), elem.type, elem.name });
return res;
}
Block RemoteBlockInputStream::readImpl()
{
if (!sent_query)
@ -170,7 +190,7 @@ Block RemoteBlockInputStream::readImpl()
case Protocol::Server::Data:
/// If the block is not empty and is not a header block
if (packet.block && (packet.block.rows() > 0))
return packet.block;
return adaptBlockStructure(packet.block, header, context);
break; /// If the block is empty - we will receive other packets before EndOfStream.
case Protocol::Server::Exception:

View File

@ -24,7 +24,7 @@ public:
/// If `settings` is nullptr, settings will be taken from context.
RemoteBlockInputStream(
Connection & connection,
const String & query_, const Context & context_, const Settings * settings = nullptr,
const String & query_, const Block & header_, const Context & context_, const Settings * settings = nullptr,
const ThrottlerPtr & throttler = nullptr, const Tables & external_tables_ = Tables(),
QueryProcessingStage::Enum stage_ = QueryProcessingStage::Complete);
@ -32,7 +32,7 @@ public:
/// If `settings` is nullptr, settings will be taken from context.
RemoteBlockInputStream(
std::vector<IConnectionPool::Entry> && connections,
const String & query_, const Context & context_, const Settings * settings = nullptr,
const String & query_, const Block & header_, const Context & context_, const Settings * settings = nullptr,
const ThrottlerPtr & throttler = nullptr, const Tables & external_tables_ = Tables(),
QueryProcessingStage::Enum stage_ = QueryProcessingStage::Complete);
@ -40,7 +40,7 @@ public:
/// If `settings` is nullptr, settings will be taken from context.
RemoteBlockInputStream(
const ConnectionPoolWithFailoverPtr & pool,
const String & query_, const Context & context_, const Settings * settings = nullptr,
const String & query_, const Block & header_, const Context & context_, const Settings * settings = nullptr,
const ThrottlerPtr & throttler = nullptr, const Tables & external_tables_ = Tables(),
QueryProcessingStage::Enum stage_ = QueryProcessingStage::Complete);
@ -66,18 +66,13 @@ public:
String getName() const override { return "Remote"; }
String getID() const override
{
std::stringstream res;
res << this;
return res.str();
}
BlockExtraInfo getBlockExtraInfo() const override
{
return multiplexed_connections->getBlockExtraInfo();
}
Block getHeader() const override { return header; }
protected:
/// Send all temporary tables to remote servers
void sendExternalTables();
@ -95,10 +90,14 @@ protected:
private:
void sendQuery();
Block receiveBlock();
/// If wasn't sent yet, send request to cancell all connections to replicas
void tryCancel(const char * reason);
private:
Block header;
std::function<std::unique_ptr<MultiplexedConnections>()> create_multiplexed_connections;
std::unique_ptr<MultiplexedConnections> multiplexed_connections;

View File

@ -22,16 +22,15 @@ public:
String getName() const override { return "RemoveColumns"; }
String getID() const override
Block getHeader() const override
{
std::stringstream res;
res << "RemoveColumns(" << children.back()->getID();
Block res = children.back()->getHeader();
for (const auto & it : columns_to_remove)
res << ", " << it;
if (res.has(it))
res.erase(it);
res << ")";
return res.str();
return res;
}
protected:

View File

@ -24,23 +24,6 @@ public:
String getName() const override { return "ReplacingSorted"; }
String getID() const override
{
std::stringstream res;
res << "ReplacingSorted(inputs";
for (size_t i = 0; i < children.size(); ++i)
res << ", " << children[i]->getID();
res << ", description";
for (size_t i = 0; i < description.size(); ++i)
res << ", " << description[i].getID();
res << ", version_column, " << version_column << ")";
return res.str();
}
protected:
/// Can return 1 more records than max_block_size.
Block readImpl() override;

View File

@ -16,12 +16,7 @@ public:
String getName() const override { return "Squashing"; }
String getID() const override
{
std::stringstream res;
res << "Squashing(" << children.at(0)->getID() << ")";
return res.str();
}
Block getHeader() const override { return children.at(0)->getHeader(); }
protected:
Block readImpl() override;

View File

@ -23,24 +23,6 @@ namespace ErrorCodes
}
String SummingSortedBlockInputStream::getID() const
{
std::stringstream res;
res << "SummingSorted(inputs";
for (size_t i = 0; i < children.size(); ++i)
res << ", " << children[i]->getID();
res << ", description";
for (size_t i = 0; i < description.size(); ++i)
res << ", " << description[i].getID();
res << ")";
return res.str();
}
void SummingSortedBlockInputStream::insertCurrentRowIfNeeded(MutableColumns & merged_columns, bool force_insertion)
{
for (auto & desc : columns_to_aggregate)

View File

@ -35,8 +35,6 @@ public:
String getName() const override { return "SummingSorted"; }
String getID() const override;
protected:
/// Can return 1 more records than max_block_size.
Block readImpl() override;

View File

@ -1,6 +1,7 @@
#include <DataStreams/TotalsHavingBlockInputStream.h>
#include <Interpreters/ExpressionActions.h>
#include <Interpreters/AggregateDescription.h>
#include <DataTypes/DataTypeAggregateFunction.h>
#include <Columns/ColumnAggregateFunction.h>
#include <Columns/ColumnsNumber.h>
#include <Columns/FilterDescription.h>
@ -29,26 +30,18 @@ TotalsHavingBlockInputStream::TotalsHavingBlockInputStream(
}
String TotalsHavingBlockInputStream::getID() const
{
std::stringstream res;
res << "TotalsHavingBlockInputStream(" << children.back()->getID()
<< "," << filter_column_name << ")";
return res.str();
}
static void finalize(Block & block)
{
for (size_t i = 0; i < block.columns(); ++i)
{
ColumnWithTypeAndName & current = block.safeGetByPosition(i);
const ColumnAggregateFunction * unfinalized_column = typeid_cast<const ColumnAggregateFunction *>(current.column.get());
const DataTypeAggregateFunction * unfinalized_type = typeid_cast<const DataTypeAggregateFunction *>(current.type.get());
if (unfinalized_column)
if (unfinalized_type)
{
current.type = unfinalized_column->getAggregateFunction()->getReturnType();
current.column = unfinalized_column->convertToValues();
current.type = unfinalized_type->getReturnType();
if (current.column)
current.column = typeid_cast<const ColumnAggregateFunction &>(*current.column).convertToValues();
}
}
}
@ -70,7 +63,7 @@ const Block & TotalsHavingBlockInputStream::getTotals()
addToTotals(overflow_aggregates, nullptr);
}
totals = header.cloneWithColumns(std::move(current_totals));
totals = children.at(0)->getHeader().cloneWithColumns(std::move(current_totals));
finalize(totals);
}
@ -81,6 +74,16 @@ const Block & TotalsHavingBlockInputStream::getTotals()
}
Block TotalsHavingBlockInputStream::getHeader() const
{
Block res = children.at(0)->getHeader();
finalize(res);
if (expression)
expression->execute(res);
return res;
}
Block TotalsHavingBlockInputStream::readImpl()
{
Block finalized;
@ -90,9 +93,6 @@ Block TotalsHavingBlockInputStream::readImpl()
{
block = children[0]->read();
if (!header)
header = block.cloneEmpty();
/// Block with values not included in `max_rows_to_group_by`. We'll postpone it.
if (overflow_row && block && block.info.is_overflows)
{

View File

@ -19,6 +19,7 @@ private:
using ExpressionActionsPtr = std::shared_ptr<ExpressionActions>;
public:
/// expression may be nullptr
TotalsHavingBlockInputStream(
const BlockInputStreamPtr & input_,
bool overflow_row_, const ExpressionActionsPtr & expression_,
@ -26,10 +27,10 @@ public:
String getName() const override { return "TotalsHaving"; }
String getID() const override;
const Block & getTotals() override;
Block getHeader() const override;
protected:
Block readImpl() override;
@ -42,8 +43,6 @@ private:
size_t passed_keys = 0;
size_t total_keys = 0;
Block header;
/** Here are the values that did not pass max_rows_to_group_by.
* They are added or not added to the current_totals, depending on the totals_mode.
*/

View File

@ -86,26 +86,6 @@ public:
String getName() const override { return "Union"; }
String getID() const override
{
std::stringstream res;
res << "Union(";
Strings children_ids(children.size());
for (size_t i = 0; i < children.size(); ++i)
children_ids[i] = children[i]->getID();
/// Order does not matter.
std::sort(children_ids.begin(), children_ids.end());
for (size_t i = 0; i < children_ids.size(); ++i)
res << (i == 0 ? "" : ", ") << children_ids[i];
res << ")";
return res.str();
}
~UnionBlockInputStream() override
{
try
@ -139,6 +119,8 @@ public:
return doGetBlockExtraInfo();
}
Block getHeader() const override { return children.at(0)->getHeader(); }
protected:
void finalize()
{

View File

@ -185,24 +185,6 @@ public:
String getName() const override { return "VersionedCollapsingSorted"; }
String getID() const override
{
std::stringstream res;
res << "VersionedCollapsingSortedBlockInputStream(inputs";
for (const auto & child : children)
res << ", " << child->getID();
res << ", description";
for (const auto & descr : description)
res << ", " << descr.getID();
res << ", sign_column, " << sign_column;
res << ", version_column, " << sign_column << ")";
return res.str();
}
protected:
/// Can return 1 more records than max_block_size.
Block readImpl() override;

View File

@ -112,7 +112,7 @@ public:
bool equals(const IDataType & rhs) const override;
bool textCanContainOnlyValidUTF8() const override;
size_t getSizeOfValueInMemory() const override { return sizeof(Field); }
size_t getSizeOfValueInMemory() const override { return sizeof(FieldType); }
};

View File

@ -73,7 +73,7 @@ BlockInputStreamPtr ClickHouseDictionarySource::loadAll()
*/
if (is_local)
return executeQuery(load_all_query, context, true).in;
return std::make_shared<RemoteBlockInputStream>(pool, load_all_query, context);
return std::make_shared<RemoteBlockInputStream>(pool, load_all_query, sample_block, context);
}
@ -103,7 +103,7 @@ BlockInputStreamPtr ClickHouseDictionarySource::createStreamForSelectiveLoad(con
{
if (is_local)
return executeQuery(query, context, true).in;
return std::make_shared<RemoteBlockInputStream>(pool, query, context);
return std::make_shared<RemoteBlockInputStream>(pool, query, sample_block, context);
}
}

View File

@ -8,13 +8,6 @@ DictionaryBlockInputStreamBase::DictionaryBlockInputStreamBase(size_t rows_count
{
}
String DictionaryBlockInputStreamBase::getID() const
{
std::stringstream ss;
ss << static_cast<const void*>(this);
return ss.str();
}
Block DictionaryBlockInputStreamBase::readImpl()
{
if (next_row == rows_count)
@ -26,4 +19,9 @@ Block DictionaryBlockInputStreamBase::readImpl()
return block;
}
Block DictionaryBlockInputStreamBase::getHeader() const
{
return getBlock(0, 0);
}
}

View File

@ -11,17 +11,16 @@ protected:
DictionaryBlockInputStreamBase(size_t rows_count, size_t max_block_size);
String getID() const override;
virtual Block getBlock(size_t start, size_t length) const = 0;
Block getHeader() const override;
private:
const size_t rows_count;
const size_t max_block_size;
size_t next_row;
size_t next_row = 0;
Block readImpl() override;
void readPrefixImpl() override { next_row = 0; }
};
}

View File

@ -101,6 +101,8 @@ public:
}
}
Block getHeader() const override { return stream->getHeader(); };
private:
Block readImpl() override { return stream->read(); }
@ -118,7 +120,6 @@ private:
}
String getName() const override { return "WithBackgroundThread"; }
String getID() const override { return "WithBackgroundThread(" + stream->getID() + ")"; }
BlockInputStreamPtr stream;
std::unique_ptr<ShellCommand> command;

View File

@ -14,6 +14,7 @@ namespace ErrorCodes
extern const int NOT_IMPLEMENTED;
extern const int SIZES_OF_COLUMNS_DOESNT_MATCH;
extern const int FILE_DOESNT_EXIST;
extern const int EXTERNAL_LIBRARY_ERROR;
}
@ -71,7 +72,11 @@ namespace
if (!data)
return sample_block.cloneEmpty();
auto columns_received = static_cast<const ClickHouseLibrary::ColumnsUInt64 *>(data);
auto columns_received = static_cast<const ClickHouseLibrary::Table *>(data);
if (columns_received->error_code)
throw Exception("Received error: " + std::to_string(columns_received->error_code) + " "
+ (columns_received->error_string ? columns_received->error_string : ""),
ErrorCodes::EXTERNAL_LIBRARY_ERROR);
MutableColumns columns(sample_block.columns());
for (const auto i : ext::range(0, columns.size()))
@ -86,17 +91,12 @@ namespace
for (size_t row_n = 0; row_n < columns_received->data[col_n].size; ++row_n)
{
const auto & type = sample_block.getByPosition(row_n).type;
if (type->isStringOrFixedString())
{
const auto & data = ext::bit_cast<const char *>(columns_received->data[col_n].data[row_n]);
auto len = strlen(data);
columns[row_n]->insertData(data, len);
}
else
{
columns[row_n]->insert(columns_received->data[col_n].data[row_n]);
}
const auto & field = columns_received->data[col_n].data[row_n];
if (!field.data)
continue;
const auto & size = field.size;
const auto & data = static_cast<const char *>(field.data);
columns[row_n]->insertData(data, size);
}
}
@ -157,11 +157,11 @@ BlockInputStreamPtr LibraryDictionarySource::loadAll()
/// Get function pointer before dataAllocate call because library->get may throw.
auto fptr
= library->get<void * (*)(decltype(data_ptr), decltype(&settings->strings), decltype(&columns))>("ClickHouseDictionary_v1_loadAll");
data_ptr = library->get<void * (*)()>("ClickHouseDictionary_v1_dataAllocate")();
= library->get<void * (*)(decltype(data_ptr), decltype(&settings->strings), decltype(&columns))>("ClickHouseDictionary_v2_loadAll");
data_ptr = library->get<void * (*)()>("ClickHouseDictionary_v2_dataAllocate")();
auto data = fptr(data_ptr, &settings->strings, &columns);
auto block = dataToBlock(description.sample_block, data);
library->get<void (*)(void *)>("ClickHouseDictionary_v1_dataDelete")(data_ptr);
library->get<void (*)(void *)>("ClickHouseDictionary_v2_dataDelete")(data_ptr);
return std::make_shared<OneBlockInputStream>(block);
}
@ -183,11 +183,11 @@ BlockInputStreamPtr LibraryDictionarySource::loadIds(const std::vector<UInt64> &
/// Get function pointer before dataAllocate call because library->get may throw.
auto fptr = library->get<void * (*)(decltype(data_ptr), decltype(&settings->strings), decltype(&columns_pass), decltype(&ids_data))>(
"ClickHouseDictionary_v1_loadIds");
data_ptr = library->get<void * (*)()>("ClickHouseDictionary_v1_dataAllocate")();
"ClickHouseDictionary_v2_loadIds");
data_ptr = library->get<void * (*)()>("ClickHouseDictionary_v2_dataAllocate")();
auto data = fptr(data_ptr, &settings->strings, &columns_pass, &ids_data);
auto block = dataToBlock(description.sample_block, data);
library->get<void (*)(void * data_ptr)>("ClickHouseDictionary_v1_dataDelete")(data_ptr);
library->get<void (*)(void * data_ptr)>("ClickHouseDictionary_v2_dataDelete")(data_ptr);
return std::make_shared<OneBlockInputStream>(block);
}
@ -195,16 +195,6 @@ BlockInputStreamPtr LibraryDictionarySource::loadKeys(const Columns & key_column
{
LOG_TRACE(log, "loadKeys " << toString() << " size = " << requested_rows.size());
/*
auto columns_c = std::make_unique<ClickHouseLibrary::Columns>(key_columns.size() + 1);
size_t i = 0;
for (auto & column : key_columns)
{
columns_c[i] = column->getName().c_str();
++i;
}
columns_c[i] = nullptr;
*/
auto columns_holder = std::make_unique<ClickHouseLibrary::CString[]>(key_columns.size());
ClickHouseLibrary::CStrings columns_pass{
static_cast<decltype(ClickHouseLibrary::CStrings::data)>(columns_holder.get()), key_columns.size()};
@ -221,17 +211,17 @@ BlockInputStreamPtr LibraryDictionarySource::loadKeys(const Columns & key_column
/// Get function pointer before dataAllocate call because library->get may throw.
auto fptr
= library->get<void * (*)(decltype(data_ptr), decltype(&settings->strings), decltype(&columns_pass), decltype(&requested_rows_c))>(
"ClickHouseDictionary_v1_loadKeys");
data_ptr = library->get<void * (*)()>("ClickHouseDictionary_v1_dataAllocate")();
"ClickHouseDictionary_v2_loadKeys");
data_ptr = library->get<void * (*)()>("ClickHouseDictionary_v2_dataAllocate")();
auto data = fptr(data_ptr, &settings->strings, &columns_pass, &requested_rows_c);
auto block = dataToBlock(description.sample_block, data);
library->get<void (*)(void * data_ptr)>("ClickHouseDictionary_v1_dataDelete")(data_ptr);
library->get<void (*)(void * data_ptr)>("ClickHouseDictionary_v2_dataDelete")(data_ptr);
return std::make_shared<OneBlockInputStream>(block);
}
bool LibraryDictionarySource::isModified() const
{
auto fptr = library->tryGet<void * (*)(decltype(&settings->strings))>("ClickHouseDictionary_v1_isModified");
auto fptr = library->tryGet<void * (*)(decltype(&settings->strings))>("ClickHouseDictionary_v2_isModified");
if (fptr)
return fptr(&settings->strings);
return true;
@ -239,7 +229,7 @@ bool LibraryDictionarySource::isModified() const
bool LibraryDictionarySource::supportsSelectiveLoad() const
{
auto fptr = library->tryGet<void * (*)(decltype(&settings->strings))>("ClickHouseDictionary_v1_supportsSelectiveLoad");
auto fptr = library->tryGet<void * (*)(decltype(&settings->strings))>("ClickHouseDictionary_v2_supportsSelectiveLoad");
if (fptr)
return fptr(&settings->strings);
return true;

View File

@ -2,6 +2,8 @@
#include <cstdint>
#define CLICKHOUSE_DICTIONARY_LIBRARY_API 1
namespace ClickHouseLibrary
{
using CString = const char *;
@ -25,4 +27,24 @@ struct ColumnsUInt64
VectorUInt64 * data = nullptr;
uint64_t size = 0;
};
struct Field
{
const void * data = nullptr;
uint64_t size = 0;
};
struct Row
{
const Field * data = nullptr;
uint64_t size = 0;
};
struct Table
{
const Row * data = nullptr;
uint64_t size = 0;
uint64_t error_code = 0; // 0 = ok; !0 = error, with message in error_string
const char * error_string = nullptr;
};
}

View File

@ -38,14 +38,6 @@ MongoDBBlockInputStream::MongoDBBlockInputStream(
MongoDBBlockInputStream::~MongoDBBlockInputStream() = default;
String MongoDBBlockInputStream::getID() const
{
std::ostringstream stream;
stream << cursor.get();
return "MongoDB(@" + stream.str() + ")";
}
namespace
{
using ValueType = ExternalResultDescription::ValueType;

View File

@ -32,7 +32,7 @@ public:
String getName() const override { return "MongoDB"; }
String getID() const override;
Block getHeader() const override { return description.sample_block; };
private:
Block readImpl() override;

View File

@ -33,12 +33,6 @@ MySQLBlockInputStream::MySQLBlockInputStream(
}
String MySQLBlockInputStream::getID() const
{
return "MySQL(" + query.str() + ")";
}
namespace
{
using ValueType = ExternalResultDescription::ValueType;

View File

@ -21,7 +21,7 @@ public:
String getName() const override { return "MySQL"; }
String getID() const override;
Block getHeader() const override { return description.sample_block; };
private:
Block readImpl() override;

View File

@ -38,12 +38,6 @@ ODBCBlockInputStream::ODBCBlockInputStream(
}
String ODBCBlockInputStream::getID() const
{
return "ODBC(" + statement.toString() + ")";
}
namespace
{
using ValueType = ExternalResultDescription::ValueType;

View File

@ -27,7 +27,7 @@ public:
String getName() const override { return "ODBC"; }
String getID() const override;
Block getHeader() const override { return description.sample_block; };
private:
Block readImpl() override;

View File

@ -23,6 +23,7 @@
#include <DataTypes/getMostSubtype.h>
#include <Core/TypeListNumber.h>
namespace DB
{
@ -156,7 +157,7 @@ public:
++index;
}
ColumnPtr getNullMapData() && { return std::move(sink_null_map_holder); }
ColumnPtr getNullMapColumnPtr() && { return std::move(sink_null_map_holder); }
private:
const UInt8 * src_null_map = nullptr;
@ -776,25 +777,25 @@ DataTypePtr FunctionArrayElement::getReturnTypeImpl(const DataTypes & arguments)
void FunctionArrayElement::executeImpl(Block & block, const ColumnNumbers & arguments, size_t result)
{
/// Check nullability.
bool is_nullable_array = false;
bool is_array_of_nullable = false;
const ColumnArray * col_array = nullptr;
const ColumnArray * col_const_array = nullptr;
col_array = checkAndGetColumn<ColumnArray>(block.getByPosition(arguments[0]).column.get());
if (col_array)
is_nullable_array = col_array->getData().isColumnNullable();
is_array_of_nullable = col_array->getData().isColumnNullable();
else
{
col_const_array = checkAndGetColumnConstData<ColumnArray>(block.getByPosition(arguments[0]).column.get());
if (col_const_array)
is_nullable_array = col_const_array->getData().isColumnNullable();
is_array_of_nullable = col_const_array->getData().isColumnNullable();
else
throw Exception("Illegal column " + block.getByPosition(arguments[0]).column->getName()
+ " of first argument of function " + getName(), ErrorCodes::ILLEGAL_COLUMN);
}
if (!is_nullable_array)
if (!is_array_of_nullable)
{
ArrayImpl::NullMapBuilder builder;
perform(block, arguments, result, builder);
@ -860,7 +861,7 @@ void FunctionArrayElement::executeImpl(Block & block, const ColumnNumbers & argu
/// Store the result.
const ColumnWithTypeAndName & source_col = source_block.getByPosition(2);
ColumnWithTypeAndName & dest_col = block.getByPosition(result);
dest_col.column = ColumnNullable::create(source_col.column, std::move(builder).getNullMapData());
dest_col.column = ColumnNullable::create(source_col.column, builder ? std::move(builder).getNullMapColumnPtr() : ColumnUInt8::create());
}
}

View File

@ -53,6 +53,13 @@ namespace ErrorCodes
extern const int CANNOT_PARSE_UUID;
extern const int TOO_LARGE_STRING_SIZE;
extern const int TOO_LESS_ARGUMENTS_FOR_FUNCTION;
extern const int LOGICAL_ERROR;
extern const int TYPE_MISMATCH;
extern const int CANNOT_CONVERT_TYPE;
extern const int ILLEGAL_COLUMN;
extern const int NUMBER_OF_ARGUMENTS_DOESNT_MATCH;
extern const int ILLEGAL_TYPE_OF_ARGUMENT;
extern const int NOT_IMPLEMENTED;
}
@ -508,9 +515,6 @@ struct ConvertImplGenericFromString
{
auto res = data_type_to.createColumn();
if (!size)
return;
IColumn & column_to = *res;
column_to.reserve(size);
@ -1272,9 +1276,7 @@ private:
static WrapperType createFixedStringWrapper(const DataTypePtr & from_type, const size_t N)
{
if (!from_type->isStringOrFixedString())
throw Exception{
"CAST AS FixedString is only implemented for types String and FixedString",
ErrorCodes::NOT_IMPLEMENTED};
throw Exception{"CAST AS FixedString is only implemented for types String and FixedString", ErrorCodes::NOT_IMPLEMENTED};
return [N] (Block & block, const ColumnNumbers & arguments, const size_t result)
{
@ -1309,9 +1311,7 @@ private:
/// both from_type and to_type should be nullptr now is array types had same dimensions
if ((from_type == nullptr) != (to_type == nullptr))
throw Exception{
"CAST AS Array can only be performed between same-dimensional array types or from String",
ErrorCodes::TYPE_MISMATCH};
throw Exception{"CAST AS Array can only be performed between same-dimensional array types or from String", ErrorCodes::TYPE_MISMATCH};
/// Prepare nested type conversion
const auto nested_function = prepare(from_nested_type, to_nested_type.get());
@ -1337,9 +1337,7 @@ private:
block.getByPosition(result).column = ColumnArray::create(nested_block.getByPosition(1).column, col_array->getOffsetsPtr());
}
else
throw Exception{
"Illegal column " + array_arg.column->getName() + " for function CAST AS Array",
ErrorCodes::LOGICAL_ERROR};
throw Exception{"Illegal column " + array_arg.column->getName() + " for function CAST AS Array", ErrorCodes::LOGICAL_ERROR};
};
}
@ -1356,16 +1354,12 @@ private:
const auto from_type = checkAndGetDataType<DataTypeTuple>(from_type_untyped.get());
if (!from_type)
throw Exception{
"CAST AS Tuple can only be performed between tuple types or from String.\nLeft type: " + from_type_untyped->getName() +
", right type: " + to_type->getName(),
ErrorCodes::TYPE_MISMATCH};
throw Exception{"CAST AS Tuple can only be performed between tuple types or from String.\nLeft type: " + from_type_untyped->getName() +
", right type: " + to_type->getName(), ErrorCodes::TYPE_MISMATCH};
if (from_type->getElements().size() != to_type->getElements().size())
throw Exception{
"CAST AS Tuple can only be performed between tuple types with the same number of elements or from String.\n"
"Left type: " + from_type->getName() + ", right type: " + to_type->getName(),
ErrorCodes::TYPE_MISMATCH};
throw Exception{"CAST AS Tuple can only be performed between tuple types with the same number of elements or from String.\n"
"Left type: " + from_type->getName() + ", right type: " + to_type->getName(), ErrorCodes::TYPE_MISMATCH};
const auto & from_element_types = from_type->getElements();
const auto & to_element_types = to_type->getElements();
@ -1441,10 +1435,8 @@ private:
};
}
else
throw Exception{
"Conversion from " + from_type->getName() + " to " + to_type->getName() +
" is not supported",
ErrorCodes::CANNOT_CONVERT_TYPE};
throw Exception{"Conversion from " + from_type->getName() + " to " + to_type->getName() +
" is not supported", ErrorCodes::CANNOT_CONVERT_TYPE};
}
template <typename EnumTypeFrom, typename EnumTypeTo>
@ -1467,10 +1459,8 @@ private:
const auto & old_value = name_value.second;
const auto & new_value = to_type->getValue(name_value.first);
if (old_value != new_value)
throw Exception{
"Enum conversion changes value for element '" + name_value.first +
"' from " + toString(old_value) + " to " + toString(new_value),
ErrorCodes::CANNOT_CONVERT_TYPE};
throw Exception{"Enum conversion changes value for element '" + name_value.first +
"' from " + toString(old_value) + " to " + toString(new_value), ErrorCodes::CANNOT_CONVERT_TYPE};
}
};
@ -1499,8 +1489,7 @@ private:
col_with_type_and_name.column = std::move(res);
}
else
throw Exception{
"Unexpected column " + first_col->getName() + " as first argument of function " + function_name,
throw Exception{"Unexpected column " + first_col->getName() + " as first argument of function " + function_name,
ErrorCodes::LOGICAL_ERROR};
};
}
@ -1540,8 +1529,7 @@ private:
/// Check that the requested conversion is allowed.
if (nullable_conversion.source_is_nullable && !nullable_conversion.result_is_nullable)
throw Exception{"Cannot convert data from a nullable type to a non-nullable type",
ErrorCodes::CANNOT_CONVERT_TYPE};
throw Exception{"Cannot convert data from a nullable type to a non-nullable type", ErrorCodes::CANNOT_CONVERT_TYPE};
if (from_type->onlyNull())
{
@ -1671,9 +1659,7 @@ private:
/// It's possible to use ConvertImplGenericFromString to convert from String to AggregateFunction,
/// but it is disabled because deserializing aggregate functions state might be unsafe.
throw Exception{
"Conversion from " + from_type->getName() + " to " + to_type->getName() + " is not supported",
ErrorCodes::CANNOT_CONVERT_TYPE};
throw Exception{"Conversion from " + from_type->getName() + " to " + to_type->getName() + " is not supported", ErrorCodes::CANNOT_CONVERT_TYPE};
}
};
@ -1709,8 +1695,7 @@ protected:
{
const auto type_col = checkAndGetColumnConst<ColumnString>(arguments.back().column.get());
if (!type_col)
throw Exception("Second argument to " + getName() + " must be a constant string describing type",
ErrorCodes::ILLEGAL_TYPE_OF_ARGUMENT);
throw Exception("Second argument to " + getName() + " must be a constant string describing type", ErrorCodes::ILLEGAL_TYPE_OF_ARGUMENT);
return DataTypeFactory::instance().get(type_col->getValue<String>());
}

View File

@ -16,6 +16,7 @@ void registerFunctionsHigherOrder(FunctionFactory & factory)
factory.registerFunction<FunctionArrayFirstIndex>();
factory.registerFunction<FunctionArraySort>();
factory.registerFunction<FunctionArrayReverseSort>();
factory.registerFunction<FunctionArrayCumSum>();
}
}

View File

@ -594,6 +594,117 @@ struct ArraySortImpl
}
};
struct ArrayCumSumImpl
{
static bool needBoolean() { return false; }
static bool needExpression() { return false; }
static bool needOneArray() { return false; }
static DataTypePtr getReturnType(const DataTypePtr & expression_return, const DataTypePtr & /*array_element*/)
{
if (checkDataType<DataTypeUInt8>(&*expression_return) ||
checkDataType<DataTypeUInt16>(&*expression_return) ||
checkDataType<DataTypeUInt32>(&*expression_return) ||
checkDataType<DataTypeUInt64>(&*expression_return))
return std::make_shared<DataTypeArray>(std::make_shared<DataTypeUInt64>());
if (checkDataType<DataTypeInt8>(&*expression_return) ||
checkDataType<DataTypeInt16>(&*expression_return) ||
checkDataType<DataTypeInt32>(&*expression_return) ||
checkDataType<DataTypeInt64>(&*expression_return))
return std::make_shared<DataTypeArray>(std::make_shared<DataTypeInt64>());
if (checkDataType<DataTypeFloat32>(&*expression_return) ||
checkDataType<DataTypeFloat64>(&*expression_return))
return std::make_shared<DataTypeArray>(std::make_shared<DataTypeFloat64>());
throw Exception("arrayCumSum cannot add values of type " + expression_return->getName(), ErrorCodes::ILLEGAL_TYPE_OF_ARGUMENT);
}
template <typename Element, typename Result>
static bool executeType(const ColumnPtr & mapped, const ColumnArray & array, ColumnPtr & res_ptr)
{
const ColumnVector<Element> * column = checkAndGetColumn<ColumnVector<Element>>(&*mapped);
if (!column)
{
const ColumnConst * column_const = checkAndGetColumnConst<ColumnVector<Element>>(&*mapped);
if (!column_const)
return false;
const Element x = column_const->template getValue<Element>();
const IColumn::Offsets & offsets = array.getOffsets();
auto res_nested = ColumnVector<Result>::create();
typename ColumnVector<Result>::Container & res_values = res_nested->getData();
res_values.resize(column_const->size());
size_t pos = 0;
for (size_t i = 0; i < offsets.size(); ++i)
{
// skip empty arrays
if (pos < offsets[i])
{
res_values[pos++] = x;
for (; pos < offsets[i]; ++pos)
{
res_values[pos] = res_values[pos - 1] + x;
}
}
}
res_ptr = ColumnArray::create(std::move(res_nested), array.getOffsetsPtr());
return true;
}
const IColumn::Offsets & offsets = array.getOffsets();
const typename ColumnVector<Element>::Container & data = column->getData();
auto res_nested = ColumnVector<Result>::create();
typename ColumnVector<Result>::Container & res_values = res_nested->getData();
res_values.resize(data.size());
size_t pos = 0;
for (size_t i = 0; i < offsets.size(); ++i)
{
// skip empty arrays
if (pos < offsets[i])
{
res_values[pos] = data[pos];
for (++pos; pos < offsets[i]; ++pos)
{
res_values[pos] = res_values[pos - 1] + data[pos];
}
}
}
res_ptr = ColumnArray::create(std::move(res_nested), array.getOffsetsPtr());
return true;
}
static ColumnPtr execute(const ColumnArray & array, ColumnPtr mapped)
{
ColumnPtr res;
if (executeType< UInt8 , UInt64>(mapped, array, res) ||
executeType< UInt16, UInt64>(mapped, array, res) ||
executeType< UInt32, UInt64>(mapped, array, res) ||
executeType< UInt64, UInt64>(mapped, array, res) ||
executeType< Int8 , Int64>(mapped, array, res) ||
executeType< Int16, Int64>(mapped, array, res) ||
executeType< Int32, Int64>(mapped, array, res) ||
executeType< Int64, Int64>(mapped, array, res) ||
executeType<Float32,Float64>(mapped, array, res) ||
executeType<Float64,Float64>(mapped, array, res))
return res;
else
throw Exception("Unexpected column for arrayCumSum: " + mapped->getName());
}
};
template <typename Impl, typename Name>
class FunctionArrayMapped : public IFunction
@ -846,6 +957,7 @@ struct NameArrayFirst { static constexpr auto name = "arrayFirst"; };
struct NameArrayFirstIndex { static constexpr auto name = "arrayFirstIndex"; };
struct NameArraySort { static constexpr auto name = "arraySort"; };
struct NameArrayReverseSort { static constexpr auto name = "arrayReverseSort"; };
struct NameArrayCumSum { static constexpr auto name = "arrayCumSum"; };
using FunctionArrayMap = FunctionArrayMapped<ArrayMapImpl, NameArrayMap>;
using FunctionArrayFilter = FunctionArrayMapped<ArrayFilterImpl, NameArrayFilter>;
@ -857,5 +969,6 @@ using FunctionArrayFirst = FunctionArrayMapped<ArrayFirstImpl, NameArrayFirst>;
using FunctionArrayFirstIndex = FunctionArrayMapped<ArrayFirstIndexImpl, NameArrayFirstIndex>;
using FunctionArraySort = FunctionArrayMapped<ArraySortImpl<true>, NameArraySort>;
using FunctionArrayReverseSort = FunctionArrayMapped<ArraySortImpl<false>, NameArrayReverseSort>;
using FunctionArrayCumSum = FunctionArrayMapped<ArrayCumSumImpl, NameArrayCumSum>;
}

View File

@ -1,6 +1,6 @@
#pragma once
#include <common/exp10.h>
#include <common/preciseExp10.h>
#include <DataTypes/DataTypesNumber.h>
#include <Columns/ColumnsNumber.h>
#include <Columns/ColumnConst.h>
@ -484,7 +484,7 @@ using FunctionExp = FunctionMathUnaryFloat64<UnaryFunctionVectorized<ExpName, ex
using FunctionLog = FunctionMathUnaryFloat64<UnaryFunctionVectorized<LogName, log>>;
using FunctionExp2 = FunctionMathUnaryFloat64<UnaryFunctionVectorized<Exp2Name, exp2>>;
using FunctionLog2 = FunctionMathUnaryFloat64<UnaryFunctionVectorized<Log2Name, log2>>;
using FunctionExp10 = FunctionMathUnaryFloat64<UnaryFunctionVectorized<Exp10Name, exp10>>;
using FunctionExp10 = FunctionMathUnaryFloat64<UnaryFunctionVectorized<Exp10Name, preciseExp10>>;
using FunctionLog10 = FunctionMathUnaryFloat64<UnaryFunctionVectorized<Log10Name, log10>>;
using FunctionSqrt = FunctionMathUnaryFloat64<UnaryFunctionVectorized<SqrtName, sqrt>>;

View File

@ -599,30 +599,17 @@ public:
void executeImpl(Block & block, const ColumnNumbers & arguments, size_t result) override
{
const IColumn * col = block.getByPosition(arguments[0]).column.get();
double seconds;
size_t size = col->size();
if (auto column = checkAndGetColumnConst<ColumnVector<Float64>>(col))
seconds = column->getValue<Float64>();
else if (auto column = checkAndGetColumnConst<ColumnVector<Float32>>(col))
seconds = static_cast<double>(column->getValue<Float64>());
else if (auto column = checkAndGetColumnConst<ColumnVector<UInt64>>(col))
seconds = static_cast<double>(column->getValue<UInt64>());
else if (auto column = checkAndGetColumnConst<ColumnVector<UInt32>>(col))
seconds = static_cast<double>(column->getValue<UInt32>());
else if (auto column = checkAndGetColumnConst<ColumnVector<UInt16>>(col))
seconds = static_cast<double>(column->getValue<UInt16>());
else if (auto column = checkAndGetColumnConst<ColumnVector<UInt8>>(col))
seconds = static_cast<double>(column->getValue<UInt8>());
else
if (!col->isColumnConst())
throw Exception("The argument of function " + getName() + " must be constant.", ErrorCodes::ILLEGAL_COLUMN);
Float64 seconds = applyVisitor(FieldVisitorConvertToNumber<Float64>(), static_cast<const ColumnConst &>(*col).getField());
if (seconds < 0)
throw Exception("Cannot sleep negative amount of time (not implemented)", ErrorCodes::BAD_ARGUMENTS);
size_t size = col->size();
/// We do not sleep if the block is empty.
if (size > 0)
{

View File

@ -65,6 +65,8 @@ public:
bool isVariadic() const override { return true; }
size_t getNumberOfArguments() const override { return 0; }
bool useDefaultImplementationForConstants() const override { return true; }
ColumnNumbers getArgumentsThatAreAlwaysConstant() const override { return {1, 2}; }
DataTypePtr getReturnTypeImpl(const DataTypes & arguments) const override
{
@ -189,13 +191,13 @@ public:
private:
void executeConst(Block & block, const ColumnNumbers & arguments, const size_t result)
{
/// Construct a block of full-size columns of size 1 and compute the function as usual.
/// Materialize the input column and compute the function as usual.
Block tmp_block;
ColumnNumbers tmp_arguments;
tmp_block.insert(block.getByPosition(arguments[0]));
tmp_block.getByPosition(0).column = static_cast<const ColumnConst *>(tmp_block.getByPosition(0).column.get())->getDataColumnPtr();
tmp_block.getByPosition(0).column = tmp_block.getByPosition(0).column->cloneResized(block.rows())->convertToFullColumnIfConst();
tmp_arguments.push_back(0);
for (size_t i = 1; i < arguments.size(); ++i)
@ -209,9 +211,7 @@ private:
execute(tmp_block, tmp_arguments, tmp_result);
block.getByPosition(result).column = block.getByPosition(result).type->createColumnConst(
block.rows(),
(*tmp_block.getByPosition(tmp_result).column)[0]);
block.getByPosition(result).column = tmp_block.getByPosition(tmp_result).column;
}
template <typename T>
@ -727,7 +727,7 @@ private:
/// Different versions of the hash tables to implement the mapping.
using NumToNum = HashMap<UInt64, UInt64, HashCRC32<UInt64>>;
using NumToString = HashMap <UInt64, StringRef, HashCRC32 <UInt64 >>; /// Everywhere StringRef's with trailing zero.
using NumToString = HashMap <UInt64, StringRef, HashCRC32<UInt64>>; /// Everywhere StringRef's with trailing zero.
using StringToNum = HashMap<StringRef, UInt64, StringRefHash>;
using StringToString = HashMap<StringRef, StringRef, StringRefHash>;
@ -740,7 +740,7 @@ private:
Field const_default_value; /// Null, if not specified.
bool initialized = false;
std::atomic<bool> initialized {false};
std::mutex mutex;
/// Can be called from different threads. It works only on the first call.

View File

@ -213,7 +213,7 @@ ReturnType readFloatTextPreciseImpl(T & x, ReadBuffer & buf)
else
x = converter.StringToFloat(buf.position(), buf.buffer().end() - buf.position(), &num_processed_characters);
if (num_processed_characters <= 0)
if (num_processed_characters < 0)
{
if constexpr (throw_exception)
throw Exception("Cannot read floating point value", ErrorCodes::CANNOT_PARSE_NUMBER);
@ -317,6 +317,7 @@ ReturnType readFloatTextFastImpl(T & x, ReadBuffer & in)
static constexpr bool throw_exception = std::is_same_v<ReturnType, void>;
bool negative = false;
x = 0;
UInt64 before_point = 0;
UInt64 after_point = 0;
int after_point_exponent = 0;
@ -446,13 +447,6 @@ ReturnType readFloatTextFastImpl(T & x, ReadBuffer & in)
}
return ReturnType(false);
}
else
{
if constexpr (throw_exception)
throw Exception("Cannot read floating point value", ErrorCodes::CANNOT_PARSE_NUMBER);
else
return false;
}
}
return ReturnType(true);

View File

@ -6,6 +6,7 @@
#include <Common/setThreadName.h>
#include <DataTypes/DataTypeAggregateFunction.h>
#include <DataTypes/DataTypeNullable.h>
#include <Columns/ColumnsNumber.h>
#include <Columns/ColumnArray.h>
#include <Columns/ColumnTuple.h>
@ -88,31 +89,55 @@ void AggregatedDataVariants::convertToTwoLevel()
}
void Aggregator::Params::calculateColumnNumbers(const Block & block)
Block Aggregator::getHeader(bool final) const
{
if (keys.empty() && !key_names.empty())
for (Names::const_iterator it = key_names.begin(); it != key_names.end(); ++it)
keys.push_back(block.getPositionByName(*it));
Block res;
for (AggregateDescriptions::iterator it = aggregates.begin(); it != aggregates.end(); ++it)
if (it->arguments.empty() && !it->argument_names.empty())
for (Names::const_iterator jt = it->argument_names.begin(); jt != it->argument_names.end(); ++jt)
it->arguments.push_back(block.getPositionByName(*jt));
if (params.src_header)
{
for (size_t i = 0; i < params.keys_size; ++i)
res.insert(params.src_header.safeGetByPosition(params.keys[i]).cloneEmpty());
for (size_t i = 0; i < params.aggregates_size; ++i)
{
size_t arguments_size = params.aggregates[i].arguments.size();
DataTypes argument_types(arguments_size);
for (size_t j = 0; j < arguments_size; ++j)
argument_types[j] = params.src_header.safeGetByPosition(params.aggregates[i].arguments[j]).type;
DataTypePtr type;
if (final)
type = params.aggregates[i].function->getReturnType();
else
type = std::make_shared<DataTypeAggregateFunction>(params.aggregates[i].function, argument_types, params.aggregates[i].parameters);
res.insert({ type->createColumn(), type, params.aggregates[i].column_name });
}
}
else if (params.intermediate_header)
{
res = params.intermediate_header.cloneEmpty();
if (final)
{
for (size_t i = 0; i < params.aggregates_size; ++i)
{
auto & elem = res.getByPosition(params.keys_size + i);
elem.type = params.aggregates[i].function->getReturnType();
elem.column = elem.type->createColumn();
}
}
}
return res;
}
void Aggregator::initialize(const Block & block)
Aggregator::Aggregator(const Params & params_)
: params(params_),
isCancelled([]() { return false; })
{
if (isCancelled())
return;
std::lock_guard<std::mutex> lock(mutex);
if (initialized)
return;
initialized = true;
if (current_memory_tracker)
memory_usage_before_aggregation = current_memory_tracker->get();
@ -134,56 +159,7 @@ void Aggregator::initialize(const Block & block)
all_aggregates_has_trivial_destructor = false;
}
if (isCancelled())
return;
/** All below, if non-empty block passed.
* (it doesn't needed in methods that merging blocks with aggregation states).
*/
if (!block)
return;
/// Transform names of columns to numbers.
params.calculateColumnNumbers(block);
if (isCancelled())
return;
/// Create "header" block, describing result.
if (!sample)
{
for (size_t i = 0; i < params.keys_size; ++i)
{
sample.insert(block.safeGetByPosition(params.keys[i]).cloneEmpty());
if (ColumnPtr converted = sample.safeGetByPosition(i).column->convertToFullColumnIfConst())
sample.safeGetByPosition(i).column = converted;
}
for (size_t i = 0; i < params.aggregates_size; ++i)
{
ColumnWithTypeAndName col;
col.name = params.aggregates[i].column_name;
size_t arguments_size = params.aggregates[i].arguments.size();
DataTypes argument_types(arguments_size);
for (size_t j = 0; j < arguments_size; ++j)
argument_types[j] = block.safeGetByPosition(params.aggregates[i].arguments[j]).type;
col.type = std::make_shared<DataTypeAggregateFunction>(params.aggregates[i].function, argument_types, params.aggregates[i].parameters);
col.column = col.type->createColumn();
sample.insert(std::move(col));
}
}
}
void Aggregator::setSampleBlock(const Block & block)
{
std::lock_guard<std::mutex> lock(mutex);
if (!sample)
sample = block.cloneEmpty();
method = chooseAggregationMethod();
}
@ -377,102 +353,70 @@ void Aggregator::compileIfPossible(AggregatedDataVariants::Type type)
}
AggregatedDataVariants::Type Aggregator::chooseAggregationMethod(const ColumnRawPtrs & key_columns, Sizes & key_sizes) const
AggregatedDataVariants::Type Aggregator::chooseAggregationMethod()
{
/// If no keys. All aggregating to single row.
if (params.keys_size == 0)
return AggregatedDataVariants::Type::without_key;
/// Check if at least one of the specified keys is nullable.
/// Create a set of nested key columns from the corresponding key columns.
/// Here "nested" means that, if a key column is nullable, we take its nested
/// column; otherwise we take the key column as is.
ColumnRawPtrs nested_key_columns;
nested_key_columns.reserve(key_columns.size());
DataTypes types_removed_nullable;
types_removed_nullable.reserve(params.keys.size());
bool has_nullable_key = false;
for (const auto & col : key_columns)
for (const auto & pos : params.keys)
{
if (col->isColumnNullable())
const auto & type = (params.src_header ? params.src_header : params.intermediate_header).safeGetByPosition(pos).type;
if (type->isNullable())
{
const ColumnNullable & nullable_col = static_cast<const ColumnNullable &>(*col);
nested_key_columns.push_back(&nullable_col.getNestedColumn());
has_nullable_key = true;
types_removed_nullable.push_back(removeNullable(type));
}
else
nested_key_columns.push_back(col);
types_removed_nullable.push_back(type);
}
/** Returns ordinary (not two-level) methods, because we start from them.
* Later, during aggregation process, data may be converted (partitioned) to two-level structure, if cardinality is high.
*/
bool all_fixed = true;
size_t keys_bytes = 0;
size_t num_array_keys = 0;
bool has_arrays_of_non_fixed_elems = false;
bool all_non_array_keys_are_fixed = true;
bool has_tuples = false;
bool has_arrays_of_nullable = false;
size_t num_contiguous_keys = 0;
size_t num_fixed_contiguous_keys = 0;
size_t num_string_keys = 0;
key_sizes.resize(params.keys_size);
for (size_t j = 0; j < params.keys_size; ++j)
{
if (nested_key_columns[j]->isFixedAndContiguous())
if (types_removed_nullable[j]->isValueUnambiguouslyRepresentedInContiguousMemoryRegion())
{
key_sizes[j] = nested_key_columns[j]->sizeOfValueIfFixed();
keys_bytes += key_sizes[j];
}
else
{
all_fixed = false;
++num_contiguous_keys;
if (const ColumnArray * arr = typeid_cast<const ColumnArray *>(nested_key_columns[j]))
if (types_removed_nullable[j]->isValueUnambiguouslyRepresentedInFixedSizeContiguousMemoryRegion())
{
++num_array_keys;
if (arr->getData().isColumnNullable())
has_arrays_of_nullable = true;
if (!arr->getData().isFixedAndContiguous())
has_arrays_of_non_fixed_elems = true;
++num_fixed_contiguous_keys;
key_sizes[j] = types_removed_nullable[j]->getSizeOfValueInMemory();
keys_bytes += key_sizes[j];
}
else
{
all_non_array_keys_are_fixed = false;
if (typeid_cast<const ColumnTuple *>(nested_key_columns[j]))
has_tuples = true;
if (types_removed_nullable[j]->isString())
{
++num_string_keys;
}
}
}
/// If no keys. All aggregating to single row.
if (params.keys_size == 0)
return AggregatedDataVariants::Type::without_key;
if (has_nullable_key || has_arrays_of_nullable)
if (has_nullable_key)
{
/// At least one key is nullable. Therefore we choose an aggregation method
/// that takes into account this fact.
if ((params.keys_size == 1) && (nested_key_columns[0]->isNumeric()))
{
/// We have exactly one key and it is nullable. We shall add it a tag
/// which specifies whether its value is null or not.
size_t size_of_field = nested_key_columns[0]->sizeOfValueIfFixed();
if ((size_of_field == 1) || (size_of_field == 2) || (size_of_field == 4) || (size_of_field == 8) || (size_of_field == 16))
return AggregatedDataVariants::Type::nullable_keys128;
else
throw Exception{"Logical error: numeric column has sizeOfField not in 1, 2, 4, 8, 16.",
ErrorCodes::LOGICAL_ERROR};
}
if (all_fixed)
if (params.keys_size == num_fixed_contiguous_keys)
{
/// Pack if possible all the keys along with information about which key values are nulls
/// into a fixed 16- or 32-byte blob.
if (keys_bytes > (std::numeric_limits<size_t>::max() - std::tuple_size<KeysNullMap<UInt128>>::value))
throw Exception{"Aggregator: keys sizes overflow", ErrorCodes::LOGICAL_ERROR};
if ((std::tuple_size<KeysNullMap<UInt128>>::value + keys_bytes) <= 16)
if (std::tuple_size<KeysNullMap<UInt128>>::value + keys_bytes <= 16)
return AggregatedDataVariants::Type::nullable_keys128;
if ((std::tuple_size<KeysNullMap<UInt256>>::value + keys_bytes) <= 32)
if (std::tuple_size<KeysNullMap<UInt256>>::value + keys_bytes <= 32)
return AggregatedDataVariants::Type::nullable_keys256;
}
@ -483,9 +427,9 @@ AggregatedDataVariants::Type Aggregator::chooseAggregationMethod(const ColumnRaw
/// No key has been found to be nullable.
/// Single numeric key.
if ((params.keys_size == 1) && nested_key_columns[0]->isNumeric())
if (params.keys_size == 1 && types_removed_nullable[0]->isValueRepresentedByNumber())
{
size_t size_of_field = nested_key_columns[0]->sizeOfValueIfFixed();
size_t size_of_field = types_removed_nullable[0]->getSizeOfValueInMemory();
if (size_of_field == 1)
return AggregatedDataVariants::Type::key8;
if (size_of_field == 2)
@ -500,23 +444,24 @@ AggregatedDataVariants::Type Aggregator::chooseAggregationMethod(const ColumnRaw
}
/// If all keys fits in N bits, will use hash table with all keys packed (placed contiguously) to single N-bit key.
if (all_fixed && keys_bytes <= 16)
return AggregatedDataVariants::Type::keys128;
if (all_fixed && keys_bytes <= 32)
return AggregatedDataVariants::Type::keys256;
if (params.keys_size == num_fixed_contiguous_keys)
{
if (keys_bytes <= 16)
return AggregatedDataVariants::Type::keys128;
if (keys_bytes <= 32)
return AggregatedDataVariants::Type::keys256;
}
/// If single string key - will use hash table with references to it. Strings itself are stored separately in Arena.
if (params.keys_size == 1 && typeid_cast<const ColumnString *>(nested_key_columns[0]))
if (params.keys_size == 1 && types_removed_nullable[0]->isString())
return AggregatedDataVariants::Type::key_string;
if (params.keys_size == 1 && typeid_cast<const ColumnFixedString *>(nested_key_columns[0]))
if (params.keys_size == 1 && types_removed_nullable[0]->isFixedString())
return AggregatedDataVariants::Type::key_fixed_string;
/** If some keys are arrays.
* If there is no more than one key that is array, and it is array of fixed-size elements, and all other keys are fixed-size,
* then it is possible to use 'concat' method (due to one-to-one correspondense). Otherwise the method will be 'serialized'.
/** If it is possible to use 'concat' method due to one-to-one correspondense. Otherwise the method will be 'serialized'.
*/
if (num_array_keys == 1 && !has_arrays_of_non_fixed_elems && all_non_array_keys_are_fixed)
if (params.keys_size == num_contiguous_keys && num_fixed_contiguous_keys + 1 >= num_contiguous_keys)
return AggregatedDataVariants::Type::concat;
/** For case with multiple strings, we use 'concat' method despite the fact, that correspondense is not one-to-one.
@ -524,11 +469,8 @@ AggregatedDataVariants::Type Aggregator::chooseAggregationMethod(const ColumnRaw
* But if strings contains zero bytes in between, different keys may clash.
* For example, keys ('a\0b', 'c') and ('a', 'b\0c') will be aggregated as one key.
* This is documented behaviour. It may be avoided by just switching to 'serialized' method, which is less efficient.
*
* Some of aggregation keys may be tuples. In most cases, tuples are flattened in expression analyzer and not passed here.
* But in rare cases, they are not flattened. Will fallback to 'serialized' method for simplicity.
*/
if (num_array_keys == 0 && !has_tuples)
if (params.keys_size == num_fixed_contiguous_keys + num_string_keys)
return AggregatedDataVariants::Type::concat;
return AggregatedDataVariants::Type::serialized;
@ -706,13 +648,10 @@ void NO_INLINE Aggregator::executeWithoutKeyImpl(
}
bool Aggregator::executeOnBlock(Block & block, AggregatedDataVariants & result,
ColumnRawPtrs & key_columns, AggregateColumns & aggregate_columns,
Sizes & key_sizes, StringRefs & key,
bool Aggregator::executeOnBlock(const Block & block, AggregatedDataVariants & result,
ColumnRawPtrs & key_columns, AggregateColumns & aggregate_columns, StringRefs & key,
bool & no_more_keys)
{
initialize(block);
if (isCancelled())
return true;
@ -769,7 +708,7 @@ bool Aggregator::executeOnBlock(Block & block, AggregatedDataVariants & result,
/// How to perform the aggregation?
if (result.empty())
{
result.init(chooseAggregationMethod(key_columns, key_sizes));
result.init(method);
result.keys_size = params.keys_size;
result.key_sizes = key_sizes;
LOG_TRACE(log, "Aggregation method: " << result.getMethodName());
@ -1056,7 +995,6 @@ void Aggregator::execute(const BlockInputStreamPtr & stream, AggregatedDataVaria
StringRefs key(params.keys_size);
ColumnRawPtrs key_columns(params.keys_size);
AggregateColumns aggregate_columns(params.aggregates_size);
Sizes key_sizes;
/** Used if there is a limit on the maximum number of rows in the aggregation,
* and if group_by_overflow_mode == ANY.
@ -1081,14 +1019,17 @@ void Aggregator::execute(const BlockInputStreamPtr & stream, AggregatedDataVaria
src_rows += block.rows();
src_bytes += block.bytes();
if (!executeOnBlock(block, result,
key_columns, aggregate_columns, key_sizes, key,
no_more_keys))
if (!executeOnBlock(block, result, key_columns, aggregate_columns, key, no_more_keys))
break;
}
/// If there was no data, and we aggregate without keys, and we must return single row with the result of empty aggregation.
/// To do this, we pass a block with zero rows to aggregate.
if (result.empty() && params.keys_size == 0 && !params.empty_result_for_aggregation_by_empty_set)
executeOnBlock(stream->getHeader(), result, key_columns, aggregate_columns, key, no_more_keys);
double elapsed_seconds = watch.elapsedSeconds();
size_t rows = result.size();
size_t rows = result.sizeWithoutOverflowRow();
LOG_TRACE(log, std::fixed << std::setprecision(3)
<< "Aggregated. " << src_rows << " to " << rows << " rows (from " << src_bytes / 1048576.0 << " MiB)"
<< " in " << elapsed_seconds << " sec."
@ -1174,9 +1115,11 @@ Block Aggregator::prepareBlockAndFill(
MutableColumns final_aggregate_columns(params.aggregates_size);
AggregateColumnsData aggregate_columns_data(params.aggregates_size);
Block header = getHeader(final);
for (size_t i = 0; i < params.keys_size; ++i)
{
key_columns[i] = sample.safeGetByPosition(i).column->cloneEmpty();
key_columns[i] = header.safeGetByPosition(i).type->createColumn();
key_columns[i]->reserve(rows);
}
@ -1184,7 +1127,7 @@ Block Aggregator::prepareBlockAndFill(
{
if (!final)
{
aggregate_columns[i] = sample.safeGetByPosition(i + params.keys_size).column->cloneEmpty();
aggregate_columns[i] = header.safeGetByPosition(i + params.keys_size).type->createColumn();
/// The ColumnAggregateFunction column captures the shared ownership of the arena with the aggregate function states.
ColumnAggregateFunction & column_aggregate_func = static_cast<ColumnAggregateFunction &>(*aggregate_columns[i]);
@ -1213,7 +1156,7 @@ Block Aggregator::prepareBlockAndFill(
filler(key_columns, aggregate_columns_data, final_aggregate_columns, data_variants.key_sizes, final);
Block res = sample.cloneEmpty();
Block res = header.cloneEmpty();
for (size_t i = 0; i < params.keys_size; ++i)
res.getByPosition(i).column = std::move(key_columns[i]);
@ -1221,18 +1164,13 @@ Block Aggregator::prepareBlockAndFill(
for (size_t i = 0; i < params.aggregates_size; ++i)
{
if (final)
{
res.getByPosition(i + params.keys_size).type = aggregate_functions[i]->getReturnType();
res.getByPosition(i + params.keys_size).column = std::move(final_aggregate_columns[i]);
}
else
{
res.getByPosition(i + params.keys_size).column = std::move(aggregate_columns[i]);
}
}
/// Change the size of the columns-constants in the block.
size_t columns = sample.columns();
size_t columns = header.columns();
for (size_t i = 0; i < columns; ++i)
if (res.getByPosition(i).column->isColumnConst())
res.getByPosition(i).column = res.getByPosition(i).column->cut(0, rows);
@ -1653,12 +1591,7 @@ public:
String getName() const override { return "MergingAndConverting"; }
String getID() const override
{
std::stringstream res;
res << this;
return res.str();
}
Block getHeader() const override { return aggregator.getHeader(final); }
~MergingAndConvertingBlockInputStream()
{
@ -1846,7 +1779,7 @@ std::unique_ptr<IBlockInputStream> Aggregator::mergeAndConvertToBlocks(
non_empty_data.push_back(data);
if (non_empty_data.empty())
return std::make_unique<NullBlockInputStream>();
return std::make_unique<NullBlockInputStream>(getHeader(final));
if (non_empty_data.size() > 1)
{
@ -2023,14 +1956,6 @@ void NO_INLINE Aggregator::mergeWithoutKeyStreamsImpl(
void Aggregator::mergeStream(const BlockInputStreamPtr & stream, AggregatedDataVariants & result, size_t max_threads)
{
if (isCancelled())
return;
StringRefs key(params.keys_size);
ColumnRawPtrs key_columns(params.keys_size);
initialize({});
if (isCancelled())
return;
@ -2062,15 +1987,6 @@ void Aggregator::mergeStream(const BlockInputStreamPtr & stream, AggregatedDataV
if (bucket_to_blocks.empty())
return;
setSampleBlock(bucket_to_blocks.begin()->second.front());
/// How to perform the aggregation?
for (size_t i = 0; i < params.keys_size; ++i)
key_columns[i] = sample.safeGetByPosition(i).column.get();
Sizes key_sizes;
AggregatedDataVariants::Type method = chooseAggregationMethod(key_columns, key_sizes);
/** `minus one` means the absence of information about the bucket
* - in the case of single-level aggregation, as well as for blocks with "overflowing" values.
* If there is at least one block with a bucket number greater than zero, then there was a two-level aggregation.
@ -2111,7 +2027,7 @@ void Aggregator::mergeStream(const BlockInputStreamPtr & stream, AggregatedDataV
LOG_TRACE(log, "Merging partially aggregated two-level data.");
auto merge_bucket = [&bucket_to_blocks, &result, &key_sizes, this](Int32 bucket, Arena * aggregates_pool, MemoryTracker * memory_tracker)
auto merge_bucket = [&bucket_to_blocks, &result, this](Int32 bucket, Arena * aggregates_pool, MemoryTracker * memory_tracker)
{
current_memory_tracker = memory_tracker;
@ -2122,7 +2038,7 @@ void Aggregator::mergeStream(const BlockInputStreamPtr & stream, AggregatedDataV
#define M(NAME) \
else if (result.type == AggregatedDataVariants::Type::NAME) \
mergeStreamsImpl(block, key_sizes, aggregates_pool, *result.NAME, result.NAME->data.impls[bucket], nullptr, false);
mergeStreamsImpl(block, result.key_sizes, aggregates_pool, *result.NAME, result.NAME->data.impls[bucket], nullptr, false);
if (false) {}
APPLY_FOR_VARIANTS_TWO_LEVEL(M)
@ -2190,7 +2106,7 @@ void Aggregator::mergeStream(const BlockInputStreamPtr & stream, AggregatedDataV
#define M(NAME, IS_TWO_LEVEL) \
else if (result.type == AggregatedDataVariants::Type::NAME) \
mergeStreamsImpl(block, key_sizes, result.aggregates_pool, *result.NAME, result.NAME->data, result.without_key, no_more_keys);
mergeStreamsImpl(block, result.key_sizes, result.aggregates_pool, *result.NAME, result.NAME->data, result.without_key, no_more_keys);
APPLY_FOR_AGGREGATED_VARIANTS(M)
#undef M
@ -2214,32 +2130,19 @@ Block Aggregator::mergeBlocks(BlocksList & blocks, bool final)
LOG_TRACE(log, "Merging partially aggregated blocks (bucket = " << bucket_num << ").");
Stopwatch watch;
StringRefs key(params.keys_size);
ColumnRawPtrs key_columns(params.keys_size);
initialize({});
setSampleBlock(blocks.front());
/// How to perform the aggregation?
for (size_t i = 0; i < params.keys_size; ++i)
key_columns[i] = sample.safeGetByPosition(i).column.get();
Sizes key_sizes;
AggregatedDataVariants::Type method = chooseAggregationMethod(key_columns, key_sizes);
/** If possible, change 'method' to some_hash64. Otherwise, leave as is.
* Better hash function is needed because during external aggregation,
* we may merge partitions of data with total number of keys far greater than 4 billion.
*/
#define APPLY_FOR_VARIANTS_THAT_MAY_USE_BETTER_HASH_FUNCTION(M) \
M(key64) \
M(key_string) \
M(key64) \
M(key_string) \
M(key_fixed_string) \
M(keys128) \
M(keys256) \
M(concat) \
M(serialized) \
M(keys128) \
M(keys256) \
M(concat) \
M(serialized) \
#define M(NAME) \
if (method == AggregatedDataVariants::Type::NAME) \
@ -2377,20 +2280,16 @@ std::vector<Block> Aggregator::convertBlockToTwoLevel(const Block & block)
if (!block)
return {};
initialize({});
setSampleBlock(block);
AggregatedDataVariants data;
StringRefs key(params.keys_size);
ColumnRawPtrs key_columns(params.keys_size);
Sizes key_sizes;
/// Remember the columns we will work with
for (size_t i = 0; i < params.keys_size; ++i)
key_columns[i] = block.safeGetByPosition(i).column.get();
AggregatedDataVariants::Type type = chooseAggregationMethod(key_columns, key_sizes);
AggregatedDataVariants::Type type = method;
data.keys_size = params.keys_size;
data.key_sizes = key_sizes;
@ -2496,30 +2395,6 @@ void Aggregator::destroyAllAggregateStates(AggregatedDataVariants & result)
}
String Aggregator::getID() const
{
std::stringstream res;
if (params.keys.empty())
{
res << "key_names";
for (size_t i = 0; i < params.key_names.size(); ++i)
res << ", " << params.key_names[i];
}
else
{
res << "keys";
for (size_t i = 0; i < params.keys.size(); ++i)
res << ", " << params.keys[i];
}
res << ", aggregates";
for (size_t i = 0; i < params.aggregates_size; ++i)
res << ", " << params.aggregates[i].column_name;
return res.str();
}
void Aggregator::setCancellationHook(const CancellationHook cancellation_hook)
{
isCancelled = cancellation_hook;

View File

@ -812,8 +812,8 @@ struct AggregatedDataVariants : private boost::noncopyable
{
switch (type_)
{
case Type::EMPTY: break;
case Type::without_key: break;
case Type::EMPTY: break;
case Type::without_key: break;
#define M(NAME, IS_TWO_LEVEL) \
case Type::NAME: NAME = std::make_unique<decltype(NAME)::element_type>(); break;
@ -832,8 +832,8 @@ struct AggregatedDataVariants : private boost::noncopyable
{
switch (type)
{
case Type::EMPTY: return 0;
case Type::without_key: return 1;
case Type::EMPTY: return 0;
case Type::without_key: return 1;
#define M(NAME, IS_TWO_LEVEL) \
case Type::NAME: return NAME->data.size() + (without_key != nullptr);
@ -850,8 +850,8 @@ struct AggregatedDataVariants : private boost::noncopyable
{
switch (type)
{
case Type::EMPTY: return 0;
case Type::without_key: return 1;
case Type::EMPTY: return 0;
case Type::without_key: return 1;
#define M(NAME, IS_TWO_LEVEL) \
case Type::NAME: return NAME->data.size();
@ -867,8 +867,8 @@ struct AggregatedDataVariants : private boost::noncopyable
{
switch (type)
{
case Type::EMPTY: return "EMPTY";
case Type::without_key: return "without_key";
case Type::EMPTY: return "EMPTY";
case Type::without_key: return "without_key";
#define M(NAME, IS_TWO_LEVEL) \
case Type::NAME: return #NAME;
@ -884,8 +884,8 @@ struct AggregatedDataVariants : private boost::noncopyable
{
switch (type)
{
case Type::EMPTY: return false;
case Type::without_key: return false;
case Type::EMPTY: return false;
case Type::without_key: return false;
#define M(NAME, IS_TWO_LEVEL) \
case Type::NAME: return IS_TWO_LEVEL;
@ -900,25 +900,25 @@ struct AggregatedDataVariants : private boost::noncopyable
#define APPLY_FOR_VARIANTS_CONVERTIBLE_TO_TWO_LEVEL(M) \
M(key32) \
M(key64) \
M(key_string) \
M(key_fixed_string) \
M(keys128) \
M(keys256) \
M(hashed) \
M(concat) \
M(serialized) \
M(nullable_keys128) \
M(nullable_keys256) \
M(key_string) \
M(key_fixed_string) \
M(keys128) \
M(keys256) \
M(hashed) \
M(concat) \
M(serialized) \
M(nullable_keys128) \
M(nullable_keys256) \
#define APPLY_FOR_VARIANTS_NOT_CONVERTIBLE_TO_TWO_LEVEL(M) \
M(key8) \
M(key8) \
M(key16) \
M(key64_hash64) \
M(key_string_hash64) \
M(key_string_hash64)\
M(key_fixed_string_hash64) \
M(keys128_hash64) \
M(keys256_hash64) \
M(concat_hash64) \
M(keys128_hash64) \
M(keys256_hash64) \
M(concat_hash64) \
M(serialized_hash64) \
#define APPLY_FOR_VARIANTS_SINGLE_LEVEL(M) \
@ -943,16 +943,16 @@ struct AggregatedDataVariants : private boost::noncopyable
void convertToTwoLevel();
#define APPLY_FOR_VARIANTS_TWO_LEVEL(M) \
M(key32_two_level) \
M(key64_two_level) \
M(key_string_two_level) \
M(key_fixed_string_two_level) \
M(keys128_two_level) \
M(keys256_two_level) \
M(hashed_two_level) \
M(concat_two_level) \
M(serialized_two_level) \
M(nullable_keys128_two_level) \
M(key32_two_level) \
M(key64_two_level) \
M(key_string_two_level) \
M(key_fixed_string_two_level) \
M(keys128_two_level) \
M(keys256_two_level) \
M(hashed_two_level) \
M(concat_two_level) \
M(serialized_two_level) \
M(nullable_keys128_two_level) \
M(nullable_keys256_two_level)
};
@ -979,9 +979,13 @@ class Aggregator
public:
struct Params
{
/// Data structure of source blocks.
Block src_header;
/// Data structure of intermediate blocks before merge.
Block intermediate_header;
/// What to count.
Names key_names;
ColumnNumbers keys; /// The column numbers are computed later.
ColumnNumbers keys;
AggregateDescriptions aggregates;
size_t keys_size;
size_t aggregates_size;
@ -1005,38 +1009,45 @@ public:
/// Settings to flush temporary data to the filesystem (external aggregation).
const size_t max_bytes_before_external_group_by; /// 0 - do not use external aggregation.
/// Return empty result when aggregating without keys on empty set.
bool empty_result_for_aggregation_by_empty_set;
const std::string tmp_path;
Params(
const Names & key_names_, const AggregateDescriptions & aggregates_,
const Block & src_header_,
const ColumnNumbers & keys_, const AggregateDescriptions & aggregates_,
bool overflow_row_, size_t max_rows_to_group_by_, OverflowMode group_by_overflow_mode_,
Compiler * compiler_, UInt32 min_count_to_compile_,
size_t group_by_two_level_threshold_, size_t group_by_two_level_threshold_bytes_,
size_t max_bytes_before_external_group_by_, const std::string & tmp_path_)
: key_names(key_names_), aggregates(aggregates_), aggregates_size(aggregates.size()),
size_t max_bytes_before_external_group_by_,
bool empty_result_for_aggregation_by_empty_set_,
const std::string & tmp_path_)
: src_header(src_header_),
keys(keys_), aggregates(aggregates_), keys_size(keys.size()), aggregates_size(aggregates.size()),
overflow_row(overflow_row_), max_rows_to_group_by(max_rows_to_group_by_), group_by_overflow_mode(group_by_overflow_mode_),
compiler(compiler_), min_count_to_compile(min_count_to_compile_),
group_by_two_level_threshold(group_by_two_level_threshold_), group_by_two_level_threshold_bytes(group_by_two_level_threshold_bytes_),
max_bytes_before_external_group_by(max_bytes_before_external_group_by_), tmp_path(tmp_path_)
max_bytes_before_external_group_by(max_bytes_before_external_group_by_),
empty_result_for_aggregation_by_empty_set(empty_result_for_aggregation_by_empty_set_),
tmp_path(tmp_path_)
{
std::sort(key_names.begin(), key_names.end());
key_names.erase(std::unique(key_names.begin(), key_names.end()), key_names.end());
keys_size = key_names.size();
}
/// Only parameters that matter during merge.
Params(const Names & key_names_, const AggregateDescriptions & aggregates_, bool overflow_row_)
: Params(key_names_, aggregates_, overflow_row_, 0, OverflowMode::THROW, nullptr, 0, 0, 0, 0, "") {}
Params(const Block & intermediate_header_,
const ColumnNumbers & keys_, const AggregateDescriptions & aggregates_, bool overflow_row_)
: Params(Block(), keys_, aggregates_, overflow_row_, 0, OverflowMode::THROW, nullptr, 0, 0, 0, 0, false, "")
{
intermediate_header = intermediate_header_;
}
/// Compute the column numbers in `keys` and `aggregates`.
/// Calculate the column numbers in `keys` and `aggregates`.
void calculateColumnNumbers(const Block & block);
};
Aggregator(const Params & params_)
: params(params_),
isCancelled([]() { return false; })
{
}
Aggregator(const Params & params_);
/// Aggregate the source. Get the result in the form of one of the data structures.
void execute(const BlockInputStreamPtr & stream, AggregatedDataVariants & result);
@ -1047,9 +1058,9 @@ public:
using AggregateFunctionsPlainPtrs = std::vector<IAggregateFunction *>;
/// Process one block. Return false if the processing should be aborted (with group_by_overflow_mode = 'break').
bool executeOnBlock(Block & block, AggregatedDataVariants & result,
bool executeOnBlock(const Block & block, AggregatedDataVariants & result,
ColumnRawPtrs & key_columns, AggregateColumns & aggregate_columns, /// Passed to not create them anew for each block
Sizes & key_sizes, StringRefs & keys, /// - pass the corresponding objects that are initially empty.
StringRefs & keys, /// - pass the corresponding objects that are initially empty.
bool & no_more_keys);
/** Convert the aggregation data structure into a block.
@ -1087,9 +1098,6 @@ public:
*/
void setCancellationHook(const CancellationHook cancellation_hook);
/// For IBlockInputStream.
String getID() const;
/// For external aggregation.
void writeToTemporaryFile(AggregatedDataVariants & data_variants);
@ -1111,12 +1119,18 @@ public:
const TemporaryFiles & getTemporaryFiles() const { return temporary_files; }
/// Get data structure of the result.
Block getHeader(bool final) const;
protected:
friend struct AggregatedDataVariants;
friend class MergingAndConvertingBlockInputStream;
Params params;
AggregatedDataVariants::Type method;
Sizes key_sizes;
AggregateFunctionsPlainPtrs aggregate_functions;
/** This array serves two purposes.
@ -1145,12 +1159,8 @@ protected:
/// How many RAM were used to process the query before processing the first block.
Int64 memory_usage_before_aggregation = 0;
/// To initialize from the first block when used concurrently.
bool initialized = false;
std::mutex mutex;
Block sample;
Logger * log = &Logger::get("Aggregator");
/** Dynamically compiled library for aggregation, if any.
@ -1179,18 +1189,8 @@ protected:
/// For external aggregation.
TemporaryFiles temporary_files;
/** If only the column names (key_names, and also aggregates[i].column_name) are specified, then calculate the column numbers.
* Generate block - sample of the result. It is used in the convertToBlocks, mergeAndConvertToBlocks methods.
*/
void initialize(const Block & block);
/** Set the block - sample of the result,
* only if it has not already been set.
*/
void setSampleBlock(const Block & block);
/** Select the aggregation method based on the number and types of keys. */
AggregatedDataVariants::Type chooseAggregationMethod(const ColumnRawPtrs & key_columns, Sizes & key_sizes) const;
AggregatedDataVariants::Type chooseAggregationMethod();
/** Create states of aggregate functions for one key.
*/

Some files were not shown because too many files have changed in this diff Show More