merged with master

This commit is contained in:
Nikolai Kochetov 2018-02-20 12:27:47 +03:00
commit af9ac7b48b
406 changed files with 5542 additions and 3741 deletions

2
.gitignore vendored
View File

@ -10,6 +10,8 @@
*.logrt *.logrt
build build
/docs/en_single_page/
/docs/ru_single_page/
# callgrind files # callgrind files
callgrind.out.* callgrind.out.*

View File

@ -3,23 +3,23 @@ language: generic
matrix: matrix:
fast_finish: true fast_finish: true
include: include:
- os: linux # - os: linux
#
cache: # cache:
ccache: true # ccache: true
timeout: 1000 # timeout: 1000
#
addons: # addons:
apt: # apt:
sources: # sources:
- ubuntu-toolchain-r-test # - ubuntu-toolchain-r-test
packages: [ g++-7, libicu-dev, libreadline-dev, libmysqlclient-dev, unixodbc-dev, libltdl-dev, libssl-dev, libboost-dev, zlib1g-dev, libdouble-conversion-dev, libzookeeper-mt-dev, libsparsehash-dev, librdkafka-dev, libcapnp-dev, libsparsehash-dev, libgoogle-perftools-dev, bash, expect, python, python-lxml, python-termcolor, curl, perl, sudo ] # packages: [ g++-7, libicu-dev, libreadline-dev, libmysqlclient-dev, unixodbc-dev, libltdl-dev, libssl-dev, libboost-dev, zlib1g-dev, libdouble-conversion-dev, libzookeeper-mt-dev, libsparsehash-dev, librdkafka-dev, libcapnp-dev, libsparsehash-dev, libgoogle-perftools-dev, bash, expect, python, python-lxml, python-termcolor, curl, perl, sudo ]
#
env: # env:
- MATRIX_EVAL="export CC=gcc-7 && export CXX=g++-7" # - MATRIX_EVAL="export CC=gcc-7 && export CXX=g++-7"
#
script: # script:
- env TEST_RUN= utils/travis/normal.sh # - env TEST_RUN= utils/travis/normal.sh
# We need to have gcc7 headers to compile c++17 code on clang # We need to have gcc7 headers to compile c++17 code on clang
@ -28,6 +28,8 @@ matrix:
cache: cache:
ccache: true ccache: true
timeout: 1000 timeout: 1000
directories:
- /home/travis/.ccache
addons: addons:
apt: apt:
@ -43,6 +45,27 @@ matrix:
- utils/travis/normal.sh - utils/travis/normal.sh
# TODO: fix internal compiler
# - os: linux
#
# sudo: required
#
# cache:
# timeout: 1000
# directories:
# - /var/cache/pbuilder/ccache
#
# addons:
# apt:
# packages: [ pbuilder, fakeroot, debhelper ]
#
# env:
# - MATRIX_EVAL="export DEB_CC=clang-5.0 && export DEB_CXX=clang++-5.0"
#
# script:
# - utils/travis/pbuilder.sh
- os: linux - os: linux
sudo: required sudo: required
@ -56,31 +79,28 @@ matrix:
apt: apt:
packages: [ pbuilder, fakeroot, debhelper ] packages: [ pbuilder, fakeroot, debhelper ]
env:
- MATRIX_EVAL="export DEB_CC=clang-5.0 && export DEB_CXX=clang++-5.0"
script: script:
- utils/travis/pbuilder.sh - utils/travis/pbuilder.sh
- os: linux # - os: linux
#
sudo: required # sudo: required
#
cache: # cache:
timeout: 1000 # timeout: 1000
directories: # directories:
- /var/cache/pbuilder/ccache # - /var/cache/pbuilder/ccache
#
addons: # addons:
apt: # apt:
packages: [ pbuilder, fakeroot, debhelper ] # packages: [ pbuilder, fakeroot, debhelper ]
#
env: # env:
- MATRIX_EVAL="export ARCH=i386" # - MATRIX_EVAL="export ARCH=i386"
#
script: # script:
- env PBUILDER_TIMEOUT=40m TEST_TRUE=true TEST_RUN= utils/travis/pbuilder.sh # - env PBUILDER_TIMEOUT=40m TEST_TRUE=true TEST_RUN= utils/travis/pbuilder.sh
# TODO: Can't bootstrap bionic on trusty host # TODO: Can't bootstrap bionic on trusty host

View File

@ -1,8 +1,8 @@
# ClickHouse release 1.1.54343, 2018-02-05 # ClickHouse release 1.1.54343, 2018-02-05
* An ability to use macros for cluster name definition in distributed DDL queries and constructors of Distributed tables was added: `CREATE TABLE distr ON CLUSTER '{cluster}' (...) ENGINE = Distributed('{cluster}', 'db', 'table')`. * Added macros support for defining cluster names in distributed DDL queries and constructors of Distributed tables: `CREATE TABLE distr ON CLUSTER '{cluster}' (...) ENGINE = Distributed('{cluster}', 'db', 'table')`.
* Now the index is used for conditions like `expr IN (subquery)`. * Now the table index is used for conditions like `expr IN (subquery)`.
* Duplicates processing when added to a Replicated table was improved, now they do not slow down the replication queue execution. * Improved processing of duplicates when inserting to Replicated tables, so they no longer slow down execution of the replication queue.
# ClickHouse release 1.1.54342, 2018-01-22 # ClickHouse release 1.1.54342, 2018-01-22

View File

@ -90,7 +90,9 @@ if (USE_INTERNAL_RDKAFKA_LIBRARY)
endif () endif ()
if (USE_INTERNAL_CAPNP_LIBRARY) if (USE_INTERNAL_CAPNP_LIBRARY)
if (NOT APPLE) # tests never end
set (BUILD_TESTING ${ENABLE_TESTS} CACHE INTERNAL "") set (BUILD_TESTING ${ENABLE_TESTS} CACHE INTERNAL "")
endif ()
set (_save ${CMAKE_CXX_EXTENSIONS}) set (_save ${CMAKE_CXX_EXTENSIONS})
set (CMAKE_CXX_EXTENSIONS) set (CMAKE_CXX_EXTENSIONS)
add_subdirectory (capnproto/c++) add_subdirectory (capnproto/c++)

2
contrib/zookeeper vendored

@ -1 +1 @@
Subproject commit 5aa9e889fe9e739af3c2a00222d9a3a0a57179dd Subproject commit 438afae5af36c5be9c82d074f43a9bb19e0797c0

View File

@ -1,6 +1,6 @@
# This strings autochanged from release_lib.sh: # This strings autochanged from release_lib.sh:
set(VERSION_DESCRIBE v1.1.54350-testing) set(VERSION_DESCRIBE v1.1.54354-testing)
set(VERSION_REVISION 54350) set(VERSION_REVISION 54354)
# end of autochange # end of autochange
set (VERSION_MAJOR 1) set (VERSION_MAJOR 1)

View File

@ -164,8 +164,7 @@ public:
{ {
const auto cond_arg = arguments[i].get(); const auto cond_arg = arguments[i].get();
if (!typeid_cast<const DataTypeUInt8 *>(cond_arg)) if (!typeid_cast<const DataTypeUInt8 *>(cond_arg))
throw Exception{ throw Exception{"Illegal type " + cond_arg->getName() + " of argument " + toString(i + 1) +
"Illegal type " + cond_arg->getName() + " of argument " + toString(i + 1) +
" of aggregate function " + derived().getName() + ", must be UInt8", " of aggregate function " + derived().getName() + ", must be UInt8",
ErrorCodes::ILLEGAL_TYPE_OF_ARGUMENT}; ErrorCodes::ILLEGAL_TYPE_OF_ARGUMENT};
} }

View File

@ -0,0 +1,6 @@
# TODO: make separate lib datastream, block, ...
#include(${ClickHouse_SOURCE_DIR}/cmake/dbms_glob_sources.cmake)
#add_headers_and_sources(clickhouse_client .)
#add_library(clickhouse_client ${SPLIT_SHARED} ${clickhouse_client_headers} ${clickhouse_client_sources})
#target_link_libraries (clickhouse_client clickhouse_common_io ${Poco_Net_LIBRARY})
#target_include_directories (clickhouse_client PRIVATE ${DBMS_INCLUDE_DIR})

View File

@ -65,15 +65,15 @@ public:
bool hasEqualOffsets(const ColumnArray & other) const; bool hasEqualOffsets(const ColumnArray & other) const;
/** More efficient methods of manipulation */ /** More efficient methods of manipulation */
IColumn & getData() { return *data->assumeMutable(); } IColumn & getData() { return data->assumeMutableRef(); }
const IColumn & getData() const { return *data; } const IColumn & getData() const { return *data; }
IColumn & getOffsetsColumn() { return *offsets->assumeMutable(); } IColumn & getOffsetsColumn() { return offsets->assumeMutableRef(); }
const IColumn & getOffsetsColumn() const { return *offsets; } const IColumn & getOffsetsColumn() const { return *offsets; }
Offsets & ALWAYS_INLINE getOffsets() Offsets & ALWAYS_INLINE getOffsets()
{ {
return static_cast<ColumnOffsets &>(*offsets->assumeMutable()).getData(); return static_cast<ColumnOffsets &>(offsets->assumeMutableRef()).getData();
} }
const Offsets & ALWAYS_INLINE getOffsets() const const Offsets & ALWAYS_INLINE getOffsets() const
@ -81,11 +81,9 @@ public:
return static_cast<const ColumnOffsets &>(*offsets).getData(); return static_cast<const ColumnOffsets &>(*offsets).getData();
} }
//MutableColumnPtr getDataMutablePtr() { return data->assumeMutable(); }
const ColumnPtr & getDataPtr() const { return data; } const ColumnPtr & getDataPtr() const { return data; }
ColumnPtr & getDataPtr() { return data; } ColumnPtr & getDataPtr() { return data; }
//MutableColumnPtr getOffsetsMutablePtr() { return offsets->assumeMutable(); }
const ColumnPtr & getOffsetsPtr() const { return offsets; } const ColumnPtr & getOffsetsPtr() const { return offsets; }
ColumnPtr & getOffsetsPtr() { return offsets; } ColumnPtr & getOffsetsPtr() { return offsets; }

View File

@ -133,9 +133,9 @@ public:
const char * deserializeAndInsertFromArena(const char * pos) override const char * deserializeAndInsertFromArena(const char * pos) override
{ {
MutableColumnPtr mutable_data = data->assumeMutable(); auto & mutable_data = data->assumeMutableRef();
auto res = mutable_data->deserializeAndInsertFromArena(pos); auto res = mutable_data.deserializeAndInsertFromArena(pos);
mutable_data->popBack(1); mutable_data.popBack(1);
++s; ++s;
return res; return res;
} }
@ -191,7 +191,7 @@ public:
/// Not part of the common interface. /// Not part of the common interface.
IColumn & getDataColumn() { return *data->assumeMutable(); } IColumn & getDataColumn() { return data->assumeMutableRef(); }
const IColumn & getDataColumn() const { return *data; } const IColumn & getDataColumn() const { return *data; }
//MutableColumnPtr getDataColumnMutablePtr() { return data; } //MutableColumnPtr getDataColumnMutablePtr() { return data; }
const ColumnPtr & getDataColumnPtr() const { return data; } const ColumnPtr & getDataColumnPtr() const { return data; }

View File

@ -121,13 +121,13 @@ std::vector<MutableColumnPtr> ColumnFunction::scatter(IColumn::ColumnIndex num_c
void ColumnFunction::insertDefault() void ColumnFunction::insertDefault()
{ {
for (auto & column : captured_columns) for (auto & column : captured_columns)
column.column->assumeMutable()->insertDefault(); column.column->assumeMutableRef().insertDefault();
++size_; ++size_;
} }
void ColumnFunction::popBack(size_t n) void ColumnFunction::popBack(size_t n)
{ {
for (auto & column : captured_columns) for (auto & column : captured_columns)
column.column->assumeMutable()->popBack(n); column.column->assumeMutableRef().popBack(n);
size_ -= n; size_ -= n;
} }

View File

@ -1,6 +1,7 @@
#pragma once #pragma once
#include <Core/NamesAndTypes.h> #include <Core/NamesAndTypes.h>
#include <Core/ColumnsWithTypeAndName.h>
#include <Columns/IColumn.h> #include <Columns/IColumn.h>
class IFunctionBase; class IFunctionBase;

View File

@ -81,17 +81,16 @@ public:
/// Return the column that represents values. /// Return the column that represents values.
IColumn & getNestedColumn() { return *nested_column->assumeMutable(); } IColumn & getNestedColumn() { return nested_column->assumeMutableRef(); }
const IColumn & getNestedColumn() const { return *nested_column; } const IColumn & getNestedColumn() const { return *nested_column; }
//ColumnPtr & getNestedColumnPtr() { return nested_column->assumeMutable(); }
const ColumnPtr & getNestedColumnPtr() const { return nested_column; } const ColumnPtr & getNestedColumnPtr() const { return nested_column; }
/// Return the column that represents the byte map. /// Return the column that represents the byte map.
//ColumnPtr & getNullMapColumnPtr() { return null_map; } //ColumnPtr & getNullMapColumnPtr() { return null_map; }
const ColumnPtr & getNullMapColumnPtr() const { return null_map; } const ColumnPtr & getNullMapColumnPtr() const { return null_map; }
ColumnUInt8 & getNullMapColumn() { return static_cast<ColumnUInt8 &>(*null_map->assumeMutable()); } ColumnUInt8 & getNullMapColumn() { return static_cast<ColumnUInt8 &>(null_map->assumeMutableRef()); }
const ColumnUInt8 & getNullMapColumn() const { return static_cast<const ColumnUInt8 &>(*null_map); } const ColumnUInt8 & getNullMapColumn() const { return static_cast<const ColumnUInt8 &>(*null_map); }
NullMap & getNullMapData() { return getNullMapColumn().getData(); } NullMap & getNullMapData() { return getNullMapColumn().getData(); }

View File

@ -81,7 +81,7 @@ void ColumnTuple::insert(const Field & x)
throw Exception("Cannot insert value of different size into tuple", ErrorCodes::CANNOT_INSERT_VALUE_OF_DIFFERENT_SIZE_INTO_TUPLE); throw Exception("Cannot insert value of different size into tuple", ErrorCodes::CANNOT_INSERT_VALUE_OF_DIFFERENT_SIZE_INTO_TUPLE);
for (size_t i = 0; i < tuple_size; ++i) for (size_t i = 0; i < tuple_size; ++i)
columns[i]->assumeMutable()->insert(tuple[i]); columns[i]->assumeMutableRef().insert(tuple[i]);
} }
void ColumnTuple::insertFrom(const IColumn & src_, size_t n) void ColumnTuple::insertFrom(const IColumn & src_, size_t n)
@ -93,19 +93,19 @@ void ColumnTuple::insertFrom(const IColumn & src_, size_t n)
throw Exception("Cannot insert value of different size into tuple", ErrorCodes::CANNOT_INSERT_VALUE_OF_DIFFERENT_SIZE_INTO_TUPLE); throw Exception("Cannot insert value of different size into tuple", ErrorCodes::CANNOT_INSERT_VALUE_OF_DIFFERENT_SIZE_INTO_TUPLE);
for (size_t i = 0; i < tuple_size; ++i) for (size_t i = 0; i < tuple_size; ++i)
columns[i]->assumeMutable()->insertFrom(*src.columns[i], n); columns[i]->assumeMutableRef().insertFrom(*src.columns[i], n);
} }
void ColumnTuple::insertDefault() void ColumnTuple::insertDefault()
{ {
for (auto & column : columns) for (auto & column : columns)
column->assumeMutable()->insertDefault(); column->assumeMutableRef().insertDefault();
} }
void ColumnTuple::popBack(size_t n) void ColumnTuple::popBack(size_t n)
{ {
for (auto & column : columns) for (auto & column : columns)
column->assumeMutable()->popBack(n); column->assumeMutableRef().popBack(n);
} }
StringRef ColumnTuple::serializeValueIntoArena(size_t n, Arena & arena, char const *& begin) const StringRef ColumnTuple::serializeValueIntoArena(size_t n, Arena & arena, char const *& begin) const
@ -120,7 +120,7 @@ StringRef ColumnTuple::serializeValueIntoArena(size_t n, Arena & arena, char con
const char * ColumnTuple::deserializeAndInsertFromArena(const char * pos) const char * ColumnTuple::deserializeAndInsertFromArena(const char * pos)
{ {
for (auto & column : columns) for (auto & column : columns)
pos = column->assumeMutable()->deserializeAndInsertFromArena(pos); pos = column->assumeMutableRef().deserializeAndInsertFromArena(pos);
return pos; return pos;
} }
@ -135,7 +135,7 @@ void ColumnTuple::insertRangeFrom(const IColumn & src, size_t start, size_t leng
{ {
const size_t tuple_size = columns.size(); const size_t tuple_size = columns.size();
for (size_t i = 0; i < tuple_size; ++i) for (size_t i = 0; i < tuple_size; ++i)
columns[i]->assumeMutable()->insertRangeFrom( columns[i]->assumeMutableRef().insertRangeFrom(
*static_cast<const ColumnTuple &>(src).columns[i], *static_cast<const ColumnTuple &>(src).columns[i],
start, length); start, length);
} }

View File

@ -65,13 +65,11 @@ public:
size_t tupleSize() const { return columns.size(); } size_t tupleSize() const { return columns.size(); }
const IColumn & getColumn(size_t idx) const { return *columns[idx]; } const IColumn & getColumn(size_t idx) const { return *columns[idx]; }
IColumn & getColumn(size_t idx) { return *columns[idx]->assumeMutable(); } IColumn & getColumn(size_t idx) { return columns[idx]->assumeMutableRef(); }
const Columns & getColumns() const { return columns; } const Columns & getColumns() const { return columns; }
const ColumnPtr & getColumnPtr(size_t idx) const { return columns[idx]; } const ColumnPtr & getColumnPtr(size_t idx) const { return columns[idx]; }
//ColumnPtr & getColumnPtr(size_t idx) { return columns[idx]; }
//MutableColumnPtr getColumnMutablePtr(size_t idx) { return columns[idx]->assumeMutable(); }
}; };

View File

@ -172,6 +172,11 @@ public:
{ {
return const_cast<COWPtr*>(this)->getPtr(); return const_cast<COWPtr*>(this)->getPtr();
} }
Derived & assumeMutableRef() const
{
return const_cast<Derived &>(*derived());
}
}; };
@ -235,6 +240,6 @@ public:
* 3. Store subobjects as immutable ptrs. Implement copy-constructor to do shallow copy. * 3. Store subobjects as immutable ptrs. Implement copy-constructor to do shallow copy.
* But reimplement 'mutate' method, so it will call 'mutate' of all subobjects (do deep mutate). * But reimplement 'mutate' method, so it will call 'mutate' of all subobjects (do deep mutate).
* It will guarantee, that mutable object have all subobjects unshared. * It will guarantee, that mutable object have all subobjects unshared.
* From non-const method, you can modify subobjects with 'assumeMutable' method. * From non-const method, you can modify subobjects with 'assumeMutableRef' method.
* Drawback: it's more complex than other solutions. * Drawback: it's more complex than other solutions.
*/ */

View File

@ -367,6 +367,8 @@ namespace ErrorCodes
extern const int CANNOT_ASSIGN_OPTIMIZE = 388; extern const int CANNOT_ASSIGN_OPTIMIZE = 388;
extern const int INSERT_WAS_DEDUPLICATED = 389; extern const int INSERT_WAS_DEDUPLICATED = 389;
extern const int CANNOT_GET_CREATE_TABLE_QUERY = 390; extern const int CANNOT_GET_CREATE_TABLE_QUERY = 390;
extern const int EXTERNAL_LIBRARY_ERROR = 391;
extern const int KEEPER_EXCEPTION = 999; extern const int KEEPER_EXCEPTION = 999;
extern const int POCO_EXCEPTION = 1000; extern const int POCO_EXCEPTION = 1000;

View File

@ -40,9 +40,6 @@ struct UInt128
bool inline operator> (const UInt128 rhs) const { return tuple() > rhs.tuple(); } bool inline operator> (const UInt128 rhs) const { return tuple() > rhs.tuple(); }
bool inline operator>= (const UInt128 rhs) const { return tuple() >= rhs.tuple(); } bool inline operator>= (const UInt128 rhs) const { return tuple() >= rhs.tuple(); }
/** Types who are stored at the moment in the database have no more than 64bits and can be handle
* inside an unique UInt64.
*/
template <typename T> bool inline operator== (const T rhs) const { return *this == UInt128(rhs); } template <typename T> bool inline operator== (const T rhs) const { return *this == UInt128(rhs); }
template <typename T> bool inline operator!= (const T rhs) const { return *this != UInt128(rhs); } template <typename T> bool inline operator!= (const T rhs) const { return *this != UInt128(rhs); }
template <typename T> bool inline operator>= (const T rhs) const { return *this >= UInt128(rhs); } template <typename T> bool inline operator>= (const T rhs) const { return *this >= UInt128(rhs); }

View File

@ -59,7 +59,7 @@
#define DBMS_MIN_REVISION_WITH_SERVER_TIMEZONE 54058 #define DBMS_MIN_REVISION_WITH_SERVER_TIMEZONE 54058
#define DBMS_MIN_REVISION_WITH_QUOTA_KEY_IN_CLIENT_INFO 54060 #define DBMS_MIN_REVISION_WITH_QUOTA_KEY_IN_CLIENT_INFO 54060
#define DBMS_MIN_REVISION_WITH_TABLES_STATUS 54226 #define DBMS_MIN_REVISION_WITH_TABLES_STATUS 54226
#define DBMS_MIN_REVISION_WITH_TIME_ZONE_PARAMETER_IN_DATETIME_DATA_TYPE 54311 #define DBMS_MIN_REVISION_WITH_TIME_ZONE_PARAMETER_IN_DATETIME_DATA_TYPE 54337
/// Version of ClickHouse TCP protocol. Set to git tag with latest protocol change. /// Version of ClickHouse TCP protocol. Set to git tag with latest protocol change.
#define DBMS_TCP_PROTOCOL_VERSION 54226 #define DBMS_TCP_PROTOCOL_VERSION 54226

View File

@ -29,6 +29,7 @@ STRONG_TYPEDEF(TupleBackend, Tuple); /// Array and Tuple are different types wit
/** 32 is enough. Round number is used for alignment and for better arithmetic inside std::vector. /** 32 is enough. Round number is used for alignment and for better arithmetic inside std::vector.
* NOTE: Actually, sizeof(std::string) is 32 when using libc++, so Field is 40 bytes.
*/ */
#define DBMS_MIN_FIELD_SIZE 32 #define DBMS_MIN_FIELD_SIZE 32

View File

@ -18,7 +18,7 @@ namespace DB
std::ostream & operator<<(std::ostream & stream, const IBlockInputStream & what) std::ostream & operator<<(std::ostream & stream, const IBlockInputStream & what)
{ {
stream << "IBlockInputStream(id = " << what.getID() << ", name = " << what.getName() << ")"; stream << "IBlockInputStream(name = " << what.getName() << ")";
//what.dumpTree(stream); // todo: set const //what.dumpTree(stream); // todo: set const
return stream; return stream;
} }
@ -115,7 +115,6 @@ std::ostream & operator<<(std::ostream & stream, const Connection::Packet & what
std::ostream & operator<<(std::ostream & stream, const SubqueryForSet & what) std::ostream & operator<<(std::ostream & stream, const SubqueryForSet & what)
{ {
stream << "SubqueryForSet(source = " << what.source stream << "SubqueryForSet(source = " << what.source
<< ", source_sample = " << what.source_sample
// TODO: << ", set = " << what.set << ", join = " << what.join // TODO: << ", set = " << what.set << ", join = " << what.join
<< ", table = " << what.table << ", table = " << what.table
<< ")"; << ")";

View File

@ -24,11 +24,11 @@ public:
String getName() const override { return "AddingConstColumn"; } String getName() const override { return "AddingConstColumn"; }
String getID() const override Block getHeader() const override
{ {
std::stringstream res; Block res = children.back()->getHeader();
res << "AddingConstColumn(" << children.back()->getID() << ")"; res.insert({data_type->createColumn(), data_type, column_name});
return res.str(); return res;
} }
protected: protected:

View File

@ -14,6 +14,11 @@ namespace ProfileEvents
namespace DB namespace DB
{ {
Block AggregatingBlockInputStream::getHeader() const
{
return aggregator.getHeader(final);
}
Block AggregatingBlockInputStream::readImpl() Block AggregatingBlockInputStream::readImpl()
{ {
@ -42,7 +47,7 @@ Block AggregatingBlockInputStream::readImpl()
if (!isCancelled()) if (!isCancelled())
{ {
/// Flush data in the RAM to disk also. It's easier. /// Flush data in the RAM to disk also. It's easier than merging on-disk and RAM data.
if (data_variants->size()) if (data_variants->size())
aggregator.writeToTemporaryFile(*data_variants); aggregator.writeToTemporaryFile(*data_variants);
} }
@ -63,9 +68,8 @@ Block AggregatingBlockInputStream::readImpl()
} }
} }
Block res;
if (isCancelled() || !impl) if (isCancelled() || !impl)
return res; return {};
return impl->read(); return impl->read();
} }

View File

@ -30,12 +30,7 @@ public:
String getName() const override { return "Aggregating"; } String getName() const override { return "Aggregating"; }
String getID() const override Block getHeader() const override;
{
std::stringstream res;
res << "Aggregating(" << children.back()->getID() << ", " << aggregator.getID() << ")";
return res.str();
}
protected: protected:
Block readImpl() override; Block readImpl() override;

View File

@ -34,8 +34,6 @@ Block AggregatingSortedBlockInputStream::readImpl()
/// Additional initialization. /// Additional initialization.
if (next_key.empty()) if (next_key.empty())
{ {
next_key.columns.resize(description.size());
/// Fill in the column numbers that need to be aggregated. /// Fill in the column numbers that need to be aggregated.
for (size_t i = 0; i < num_columns; ++i) for (size_t i = 0; i < num_columns; ++i)
{ {
@ -88,7 +86,6 @@ void AggregatingSortedBlockInputStream::merge(MutableColumns & merged_columns, s
if (current_key.empty()) /// The first key encountered. if (current_key.empty()) /// The first key encountered.
{ {
current_key.columns.resize(description.size());
setPrimaryKeyRef(current_key, current); setPrimaryKeyRef(current_key, current);
key_differs = true; key_differs = true;
} }

View File

@ -28,26 +28,8 @@ public:
String getName() const override { return "AggregatingSorted"; } String getName() const override { return "AggregatingSorted"; }
String getID() const override
{
std::stringstream res;
res << "AggregatingSorted(inputs";
for (size_t i = 0; i < children.size(); ++i)
res << ", " << children[i]->getID();
res << ", description";
for (size_t i = 0; i < description.size(); ++i)
res << ", " << description[i].getID();
res << ")";
return res.str();
}
bool isGroupedOutput() const override { return true; } bool isGroupedOutput() const override { return true; }
bool isSortedOutput() const override { return true; } bool isSortedOutput() const override { return true; }
const SortDescription & getSortDescription() const override { return description; }
protected: protected:
/// Can return 1 more records than max_block_size. /// Can return 1 more records than max_block_size.

View File

@ -35,13 +35,6 @@ public:
String getName() const override { return "Asynchronous"; } String getName() const override { return "Asynchronous"; }
String getID() const override
{
std::stringstream res;
res << "Asynchronous(" << children.back()->getID() << ")";
return res.str();
}
void readPrefix() override void readPrefix() override
{ {
/// Do not call `readPrefix` on the child, so that the corresponding actions are performed in a separate thread. /// Do not call `readPrefix` on the child, so that the corresponding actions are performed in a separate thread.
@ -80,6 +73,9 @@ public:
} }
Block getHeader() const override { return children.at(0)->getHeader(); }
~AsynchronousBlockInputStream() override ~AsynchronousBlockInputStream() override
{ {
if (started) if (started)

View File

@ -5,7 +5,7 @@
namespace DB namespace DB
{ {
/** Adds to one thread additional block information that is specified /** Adds to one stream additional block information that is specified
* as the constructor parameter. * as the constructor parameter.
*/ */
class BlockExtraInfoInputStream : public IProfilingBlockInputStream class BlockExtraInfoInputStream : public IProfilingBlockInputStream
@ -24,12 +24,7 @@ public:
String getName() const override { return "BlockExtraInfoInput"; } String getName() const override { return "BlockExtraInfoInput"; }
String getID() const override Block getHeader() const override { return children.back()->getHeader(); }
{
std::stringstream res;
res << "BlockExtraInfoInput(" << children.back()->getID() << ")";
return res.str();
}
protected: protected:
Block readImpl() override Block readImpl() override

View File

@ -21,7 +21,6 @@ struct BlockIO
BlockInputStreamPtr in; BlockInputStreamPtr in;
BlockOutputStreamPtr out; BlockOutputStreamPtr out;
Block in_sample; /// Example of a block to be read from `in`.
Block out_sample; /// Example of a block to be written to `out`. Block out_sample; /// Example of a block to be written to `out`.
/// Callbacks for query logging could be set here. /// Callbacks for query logging could be set here.
@ -51,7 +50,6 @@ struct BlockIO
process_list_entry = rhs.process_list_entry; process_list_entry = rhs.process_list_entry;
in = rhs.in; in = rhs.in;
out = rhs.out; out = rhs.out;
in_sample = rhs.in_sample;
out_sample = rhs.out_sample; out_sample = rhs.out_sample;
finish_callback = rhs.finish_callback; finish_callback = rhs.finish_callback;

View File

@ -29,15 +29,10 @@ public:
String getName() const override { return "BlockInputStreamFromRowInputStream"; } String getName() const override { return "BlockInputStreamFromRowInputStream"; }
String getID() const override
{
std::stringstream res;
res << this;
return res.str();
}
RowInputStreamPtr & getRowInput() { return row_input; } RowInputStreamPtr & getRowInput() { return row_input; }
Block getHeader() const override { return sample; }
protected: protected:
Block readImpl() override; Block readImpl() override;

View File

@ -22,13 +22,6 @@ public:
String getName() const override { return "BlocksList"; } String getName() const override { return "BlocksList"; }
String getID() const override
{
std::stringstream res;
res << this;
return res.str();
}
protected: protected:
Block readImpl() override Block readImpl() override
{ {

View File

@ -20,11 +20,6 @@ String CastTypeBlockInputStream::getName() const
return "CastType"; return "CastType";
} }
String CastTypeBlockInputStream::getID() const
{
return "CastType(" + children.back()->getID() + ")";
}
Block CastTypeBlockInputStream::readImpl() Block CastTypeBlockInputStream::readImpl()
{ {
Block block = children.back()->read(); Block block = children.back()->read();

View File

@ -17,7 +17,7 @@ public:
String getName() const override; String getName() const override;
String getID() const override; Block getHeader() const override { return ref_definition; }
protected: protected:
Block readImpl() override; Block readImpl() override;

View File

@ -28,26 +28,11 @@ public:
String getName() const override { return "CollapsingFinal"; } String getName() const override { return "CollapsingFinal"; }
String getID() const override
{
std::stringstream res;
res << "CollapsingFinal(inputs";
for (size_t i = 0; i < children.size(); ++i)
res << ", " << children[i]->getID();
res << ", description";
for (size_t i = 0; i < description.size(); ++i)
res << ", " << description[i].getID();
res << ", sign_column, " << sign_column_name << ")";
return res.str();
}
bool isSortedOutput() const override { return true; } bool isSortedOutput() const override { return true; }
const SortDescription & getSortDescription() const override { return description; } const SortDescription & getSortDescription() const override { return description; }
Block getHeader() const override { return children.at(0)->getHeader(); }
protected: protected:
Block readImpl() override; Block readImpl() override;

View File

@ -27,7 +27,7 @@ void CollapsingSortedBlockInputStream::reportIncorrectData()
{ {
if (i != 0) if (i != 0)
s << ", "; s << ", ";
s << applyVisitor(FieldVisitorToString(), (*current_key.columns[i])[current_key.row_num]); s << applyVisitor(FieldVisitorToString(), (*(*current_key.columns)[i])[current_key.row_num]);
} }
s << ")."; s << ").";
@ -53,10 +53,10 @@ void CollapsingSortedBlockInputStream::insertRows(MutableColumns & merged_column
LOG_INFO(log, "All rows collapsed"); LOG_INFO(log, "All rows collapsed");
++merged_rows; ++merged_rows;
for (size_t i = 0; i < num_columns; ++i) for (size_t i = 0; i < num_columns; ++i)
merged_columns[i]->insertFrom(*last_positive.columns[i], last_positive.row_num); merged_columns[i]->insertFrom(*(*last_positive.columns)[i], last_positive.row_num);
++merged_rows; ++merged_rows;
for (size_t i = 0; i < num_columns; ++i) for (size_t i = 0; i < num_columns; ++i)
merged_columns[i]->insertFrom(*last_negative.columns[i], last_negative.row_num); merged_columns[i]->insertFrom(*(*last_negative.columns)[i], last_negative.row_num);
if (out_row_sources_buf) if (out_row_sources_buf)
{ {
@ -72,7 +72,7 @@ void CollapsingSortedBlockInputStream::insertRows(MutableColumns & merged_column
{ {
++merged_rows; ++merged_rows;
for (size_t i = 0; i < num_columns; ++i) for (size_t i = 0; i < num_columns; ++i)
merged_columns[i]->insertFrom(*first_negative.columns[i], first_negative.row_num); merged_columns[i]->insertFrom(*(*first_negative.columns)[i], first_negative.row_num);
if (out_row_sources_buf) if (out_row_sources_buf)
current_row_sources[first_negative_pos].setSkipFlag(false); current_row_sources[first_negative_pos].setSkipFlag(false);
@ -82,7 +82,7 @@ void CollapsingSortedBlockInputStream::insertRows(MutableColumns & merged_column
{ {
++merged_rows; ++merged_rows;
for (size_t i = 0; i < num_columns; ++i) for (size_t i = 0; i < num_columns; ++i)
merged_columns[i]->insertFrom(*last_positive.columns[i], last_positive.row_num); merged_columns[i]->insertFrom(*(*last_positive.columns)[i], last_positive.row_num);
if (out_row_sources_buf) if (out_row_sources_buf)
current_row_sources[last_positive_pos].setSkipFlag(false); current_row_sources[last_positive_pos].setSkipFlag(false);
@ -124,13 +124,8 @@ Block CollapsingSortedBlockInputStream::readImpl()
/// Additional initialization. /// Additional initialization.
if (first_negative.empty()) if (first_negative.empty())
{
first_negative.columns.resize(num_columns);
last_negative.columns.resize(num_columns);
last_positive.columns.resize(num_columns);
sign_column_number = header.getPositionByName(sign_column); sign_column_number = header.getPositionByName(sign_column);
}
merge(merged_columns, queue); merge(merged_columns, queue);
return header.cloneWithColumns(std::move(merged_columns)); return header.cloneWithColumns(std::move(merged_columns));
@ -147,12 +142,7 @@ void CollapsingSortedBlockInputStream::merge(MutableColumns & merged_columns, st
SortCursor current = queue.top(); SortCursor current = queue.top();
if (current_key.empty()) if (current_key.empty())
{
current_key.columns.resize(description.size());
next_key.columns.resize(description.size());
setPrimaryKeyRef(current_key, current); setPrimaryKeyRef(current_key, current);
}
Int8 sign = static_cast<const ColumnInt8 &>(*current->all_columns[sign_column_number]).getData()[current->pos]; Int8 sign = static_cast<const ColumnInt8 &>(*current->all_columns[sign_column_number]).getData()[current->pos];
setPrimaryKeyRef(next_key, current); setPrimaryKeyRef(next_key, current);

View File

@ -25,8 +25,7 @@ class CollapsingSortedBlockInputStream : public MergingSortedBlockInputStream
public: public:
CollapsingSortedBlockInputStream( CollapsingSortedBlockInputStream(
BlockInputStreams inputs_, const SortDescription & description_, BlockInputStreams inputs_, const SortDescription & description_,
const String & sign_column_, size_t max_block_size_, const String & sign_column_, size_t max_block_size_, WriteBuffer * out_row_sources_buf_ = nullptr)
WriteBuffer * out_row_sources_buf_ = nullptr)
: MergingSortedBlockInputStream(inputs_, description_, max_block_size_, 0, out_row_sources_buf_) : MergingSortedBlockInputStream(inputs_, description_, max_block_size_, 0, out_row_sources_buf_)
, sign_column(sign_column_) , sign_column(sign_column_)
{ {
@ -34,23 +33,6 @@ public:
String getName() const override { return "CollapsingSorted"; } String getName() const override { return "CollapsingSorted"; }
String getID() const override
{
std::stringstream res;
res << "CollapsingSorted(inputs";
for (size_t i = 0; i < children.size(); ++i)
res << ", " << children[i]->getID();
res << ", description";
for (size_t i = 0; i < description.size(); ++i)
res << ", " << description[i].getID();
res << ", sign_column, " << sign_column << ")";
return res.str();
}
protected: protected:
/// Can return 1 more records than max_block_size. /// Can return 1 more records than max_block_size.
Block readImpl() override; Block readImpl() override;

View File

@ -30,19 +30,6 @@ ColumnGathererStream::ColumnGathererStream(
} }
String ColumnGathererStream::getID() const
{
std::stringstream res;
res << getName() << "(";
for (size_t i = 0; i < children.size(); ++i)
res << (i == 0 ? "" : ", " ) << children[i]->getID();
res << ")";
return res.str();
}
void ColumnGathererStream::init() void ColumnGathererStream::init()
{ {
sources.reserve(children.size()); sources.reserve(children.size());
@ -107,13 +94,13 @@ void ColumnGathererStream::fetchNewBlock(Source & source, size_t source_num)
} }
catch (Exception & e) catch (Exception & e)
{ {
e.addMessage("Cannot fetch required block. Stream " + children[source_num]->getID() + ", part " + toString(source_num)); e.addMessage("Cannot fetch required block. Stream " + children[source_num]->getName() + ", part " + toString(source_num));
throw; throw;
} }
if (0 == source.size) if (0 == source.size)
{ {
throw Exception("Fetched block is empty. Stream " + children[source_num]->getID() + ", part " + toString(source_num), throw Exception("Fetched block is empty. Stream " + children[source_num]->getName() + ", part " + toString(source_num),
ErrorCodes::RECEIVED_EMPTY_DATA); ErrorCodes::RECEIVED_EMPTY_DATA);
} }
} }

View File

@ -61,12 +61,12 @@ public:
String getName() const override { return "ColumnGatherer"; } String getName() const override { return "ColumnGatherer"; }
String getID() const override;
Block readImpl() override; Block readImpl() override;
void readSuffixImpl() override; void readSuffixImpl() override;
Block getHeader() const override { return children.at(0)->getHeader(); }
/// for use in implementations of IColumn::gather() /// for use in implementations of IColumn::gather()
template <typename Column> template <typename Column>
void gather(Column & column_res); void gather(Column & column_res);

View File

@ -22,24 +22,7 @@ public:
String getName() const override { return "Concat"; } String getName() const override { return "Concat"; }
String getID() const override Block getHeader() const override { return children.at(0)->getHeader(); }
{
std::stringstream res;
res << "Concat(";
Strings children_ids(children.size());
for (size_t i = 0; i < children.size(); ++i)
children_ids[i] = children[i]->getID();
/// Let's assume that the order of concatenation of blocks does not matter.
std::sort(children_ids.begin(), children_ids.end());
for (size_t i = 0; i < children_ids.size(); ++i)
res << (i == 0 ? "" : ", ") << children_ids[i];
res << ")";
return res.str();
}
protected: protected:
Block readImpl() override Block readImpl() override

View File

@ -35,24 +35,7 @@ public:
String getName() const override { return "CreatingSets"; } String getName() const override { return "CreatingSets"; }
String getID() const override Block getHeader() const override { return children.back()->getHeader(); }
{
std::stringstream res;
res << "CreatingSets(";
Strings children_ids(children.size());
for (size_t i = 0; i < children.size(); ++i)
children_ids[i] = children[i]->getID();
/// Let's assume that the order of creating sets does not matter.
std::sort(children_ids.begin(), children_ids.end() - 1);
for (size_t i = 0; i < children_ids.size(); ++i)
res << (i == 0 ? "" : ", ") << children_ids[i];
res << ")";
return res.str();
}
/// Takes `totals` only from the main source, not from subquery sources. /// Takes `totals` only from the main source, not from subquery sources.
const Block & getTotals() override; const Block & getTotals() override;

View File

@ -18,13 +18,6 @@ DistinctBlockInputStream::DistinctBlockInputStream(const BlockInputStreamPtr & i
children.push_back(input); children.push_back(input);
} }
String DistinctBlockInputStream::getID() const
{
std::stringstream res;
res << "Distinct(" << children.back()->getID() << ")";
return res.str();
}
Block DistinctBlockInputStream::readImpl() Block DistinctBlockInputStream::readImpl()
{ {
/// Execute until end of stream or until /// Execute until end of stream or until

View File

@ -22,7 +22,7 @@ public:
String getName() const override { return "Distinct"; } String getName() const override { return "Distinct"; }
String getID() const override; Block getHeader() const override { return children.at(0)->getHeader(); }
protected: protected:
Block readImpl() override; Block readImpl() override;

View File

@ -19,13 +19,6 @@ DistinctSortedBlockInputStream::DistinctSortedBlockInputStream(const BlockInputS
children.push_back(input); children.push_back(input);
} }
String DistinctSortedBlockInputStream::getID() const
{
std::stringstream res;
res << "DistinctSorted(" << children.back()->getID() << ")";
return res.str();
}
Block DistinctSortedBlockInputStream::readImpl() Block DistinctSortedBlockInputStream::readImpl()
{ {
/// Execute until end of stream or until /// Execute until end of stream or until

View File

@ -25,7 +25,7 @@ public:
String getName() const override { return "DistinctSorted"; } String getName() const override { return "DistinctSorted"; }
String getID() const override; Block getHeader() const override { return children.at(0)->getHeader(); }
protected: protected:
Block readImpl() override; Block readImpl() override;

View File

@ -13,13 +13,6 @@ ExpressionBlockInputStream::ExpressionBlockInputStream(const BlockInputStreamPtr
String ExpressionBlockInputStream::getName() const { return "Expression"; } String ExpressionBlockInputStream::getName() const { return "Expression"; }
String ExpressionBlockInputStream::getID() const
{
std::stringstream res;
res << "Expression(" << children.back()->getID() << ", " << expression->getID() << ")";
return res.str();
}
const Block & ExpressionBlockInputStream::getTotals() const Block & ExpressionBlockInputStream::getTotals()
{ {
if (IProfilingBlockInputStream * child = dynamic_cast<IProfilingBlockInputStream *>(&*children.back())) if (IProfilingBlockInputStream * child = dynamic_cast<IProfilingBlockInputStream *>(&*children.back()))
@ -31,14 +24,19 @@ const Block & ExpressionBlockInputStream::getTotals()
return totals; return totals;
} }
Block ExpressionBlockInputStream::getHeader() const
{
Block res = children.back()->getHeader();
expression->execute(res);
return res;
}
Block ExpressionBlockInputStream::readImpl() Block ExpressionBlockInputStream::readImpl()
{ {
Block res = children.back()->read(); Block res = children.back()->read();
if (!res) if (!res)
return res; return res;
expression->execute(res); expression->execute(res);
return res; return res;
} }

View File

@ -22,8 +22,8 @@ public:
ExpressionBlockInputStream(const BlockInputStreamPtr & input, const ExpressionActionsPtr & expression_); ExpressionBlockInputStream(const BlockInputStreamPtr & input, const ExpressionActionsPtr & expression_);
String getName() const override; String getName() const override;
String getID() const override;
const Block & getTotals() override; const Block & getTotals() override;
Block getHeader() const override;
protected: protected:
Block readImpl() override; Block readImpl() override;

View File

@ -23,24 +23,36 @@ FilterBlockInputStream::FilterBlockInputStream(const BlockInputStreamPtr & input
children.push_back(input); children.push_back(input);
} }
FilterBlockInputStream::FilterBlockInputStream(const BlockInputStreamPtr & input, const ExpressionActionsPtr & expression_, const String & filter_column_name_) FilterBlockInputStream::FilterBlockInputStream(const BlockInputStreamPtr & input, const ExpressionActionsPtr & expression_, const String & filter_column_name)
: expression(expression_), filter_column(-1), filter_column_name(filter_column_name_) : expression(expression_)
{ {
children.push_back(input); children.push_back(input);
/// Determine position of filter column.
header = input->getHeader();
expression->execute(header);
filter_column = header.getPositionByName(filter_column_name);
/// Isn't the filter already constant?
ColumnPtr column = header.safeGetByPosition(filter_column).column;
if (column)
constant_filter_description = ConstantFilterDescription(*column);
if (!constant_filter_description.always_false
&& !constant_filter_description.always_true)
{
/// Replace the filter column to a constant with value 1.
auto & header_filter_elem = header.getByPosition(filter_column);
header_filter_elem.column = header_filter_elem.type->createColumnConst(header.rows(), UInt64(1));
}
} }
String FilterBlockInputStream::getName() const { return "Filter"; } String FilterBlockInputStream::getName() const { return "Filter"; }
String FilterBlockInputStream::getID() const
{
std::stringstream res;
res << "Filter(" << children.back()->getID() << ", " << expression->getID() << ", " << filter_column << ", " << filter_column_name << ")";
return res.str();
}
const Block & FilterBlockInputStream::getTotals() const Block & FilterBlockInputStream::getTotals()
{ {
if (IProfilingBlockInputStream * child = dynamic_cast<IProfilingBlockInputStream *>(&*children.back())) if (IProfilingBlockInputStream * child = dynamic_cast<IProfilingBlockInputStream *>(&*children.back()))
@ -53,37 +65,18 @@ const Block & FilterBlockInputStream::getTotals()
} }
Block FilterBlockInputStream::getHeader() const
{
return header;
}
Block FilterBlockInputStream::readImpl() Block FilterBlockInputStream::readImpl()
{ {
Block res; Block res;
if (is_first)
{
is_first = false;
const Block & sample_block = expression->getSampleBlock();
/// Find the current position of the filter column in the block.
/** sample_block has the result structure of evaluating the expression.
* But this structure does not necessarily match expression->execute(res) below,
* because the expression can be applied to a block that also contains additional,
* columns unnecessary for this expression, but needed later, in the next stages of the query execution pipeline.
* There will be no such columns in sample_block.
* Therefore, the position of the filter column in it can be different.
*/
ssize_t filter_column_in_sample_block = filter_column;
if (filter_column_in_sample_block == -1)
filter_column_in_sample_block = sample_block.getPositionByName(filter_column_name);
/// Let's check if the filter column is a constant containing 0 or 1.
ColumnPtr column = sample_block.safeGetByPosition(filter_column_in_sample_block).column;
if (column)
constant_filter_description = ConstantFilterDescription(*column);
if (constant_filter_description.always_false) if (constant_filter_description.always_false)
return res; return res;
}
/// Until non-empty block after filtering or end of stream. /// Until non-empty block after filtering or end of stream.
while (1) while (1)
@ -97,10 +90,6 @@ Block FilterBlockInputStream::readImpl()
if (constant_filter_description.always_true) if (constant_filter_description.always_true)
return res; return res;
/// Find the current position of the filter column in the block.
if (filter_column == -1)
filter_column = res.getPositionByName(filter_column_name);
size_t columns = res.columns(); size_t columns = res.columns();
ColumnPtr column = res.safeGetByPosition(filter_column).column; ColumnPtr column = res.safeGetByPosition(filter_column).column;

View File

@ -25,18 +25,16 @@ public:
FilterBlockInputStream(const BlockInputStreamPtr & input, const ExpressionActionsPtr & expression_, const String & filter_column_name_); FilterBlockInputStream(const BlockInputStreamPtr & input, const ExpressionActionsPtr & expression_, const String & filter_column_name_);
String getName() const override; String getName() const override;
String getID() const override;
const Block & getTotals() override; const Block & getTotals() override;
Block getHeader() const override;
protected: protected:
Block readImpl() override; Block readImpl() override;
private: private:
ExpressionActionsPtr expression; ExpressionActionsPtr expression;
Block header;
ssize_t filter_column; ssize_t filter_column;
String filter_column_name;
bool is_first = true;
ConstantFilterDescription constant_filter_description; ConstantFilterDescription constant_filter_description;
}; };

View File

@ -3,16 +3,16 @@
namespace DB namespace DB
{ {
String FilterColumnsBlockInputStream::getID() const Block FilterColumnsBlockInputStream::getHeader() const
{ {
std::stringstream res; Block block = children.back()->getHeader();
res << "FilterColumnsBlockInputStream(" << children.back()->getID(); Block filtered;
for (const auto & it : columns_to_save) for (const auto & it : columns_to_save)
res << ", " << it; if (throw_if_column_not_found || block.has(it))
filtered.insert(std::move(block.getByName(it)));
res << ")"; return filtered;
return res.str();
} }
Block FilterColumnsBlockInputStream::readImpl() Block FilterColumnsBlockInputStream::readImpl()

View File

@ -24,7 +24,7 @@ public:
return "FilterColumnsBlockInputStream"; return "FilterColumnsBlockInputStream";
} }
String getID() const override; Block getHeader() const override;
protected: protected:
Block readImpl() override; Block readImpl() override;

View File

@ -59,7 +59,7 @@ BlockInputStreamPtr FormatFactory::getInput(const String & name, ReadBuffer & bu
if (name == "Native") if (name == "Native")
{ {
return std::make_shared<NativeBlockInputStream>(buf); return std::make_shared<NativeBlockInputStream>(buf, sample, 0);
} }
else if (name == "RowBinary") else if (name == "RowBinary")
{ {

View File

@ -98,9 +98,6 @@ Block GraphiteRollupSortedBlockInputStream::readImpl()
for (size_t i = 0; i < num_columns; ++i) for (size_t i = 0; i < num_columns; ++i)
if (i != time_column_num && i != value_column_num && i != version_column_num) if (i != time_column_num && i != value_column_num && i != version_column_num)
unmodified_column_numbers.push_back(i); unmodified_column_numbers.push_back(i);
if (current_subgroup_newest_row.empty())
current_subgroup_newest_row.columns.resize(num_columns);
} }
merge(merged_columns, queue); merge(merged_columns, queue);
@ -257,14 +254,14 @@ void GraphiteRollupSortedBlockInputStream::finishCurrentGroup(MutableColumns & m
} }
else else
merged_columns[value_column_num]->insertFrom( merged_columns[value_column_num]->insertFrom(
*current_subgroup_newest_row.columns[value_column_num], current_subgroup_newest_row.row_num); *(*current_subgroup_newest_row.columns)[value_column_num], current_subgroup_newest_row.row_num);
} }
void GraphiteRollupSortedBlockInputStream::accumulateRow(RowRef & row) void GraphiteRollupSortedBlockInputStream::accumulateRow(RowRef & row)
{ {
if (aggregate_state_created) if (aggregate_state_created)
current_pattern->function->add(place_for_aggregate_state.data(), &row.columns[value_column_num], row.row_num, nullptr); current_pattern->function->add(place_for_aggregate_state.data(), &(*row.columns)[value_column_num], row.row_num, nullptr);
} }
} }

View File

@ -135,23 +135,6 @@ public:
String getName() const override { return "GraphiteRollupSorted"; } String getName() const override { return "GraphiteRollupSorted"; }
String getID() const override
{
std::stringstream res;
res << "GraphiteRollupSorted(inputs";
for (size_t i = 0; i < children.size(); ++i)
res << ", " << children[i]->getID();
res << ", description";
for (size_t i = 0; i < description.size(); ++i)
res << ", " << description[i].getID();
res << ")";
return res.str();
}
~GraphiteRollupSortedBlockInputStream() ~GraphiteRollupSortedBlockInputStream()
{ {
if (aggregate_state_created) if (aggregate_state_created)

View File

@ -64,6 +64,7 @@ void IBlockInputStream::dumpTree(std::ostream & ostr, size_t indent, size_t mult
ostr << String(indent, ' ') << getName(); ostr << String(indent, ' ') << getName();
if (multiplier > 1) if (multiplier > 1)
ostr << " × " << multiplier; ostr << " × " << multiplier;
// ostr << ": " << getHeader().dumpStructure();
ostr << std::endl; ostr << std::endl;
++indent; ++indent;
@ -125,13 +126,5 @@ void IBlockInputStream::getLeavesImpl(BlockInputStreams & res, const BlockInputS
(*it)->getLeavesImpl(res, *it); (*it)->getLeavesImpl(res, *it);
} }
/// By default all instances is different streams
String IBlockInputStream::getID() const
{
std::stringstream res;
res << getName() << "(" << this << ")";
return res.str();
};
} }

View File

@ -48,6 +48,12 @@ class IBlockInputStream : private boost::noncopyable
public: public:
IBlockInputStream() {} IBlockInputStream() {}
/** Get data structure of the stream in a form of "header" block (it is also called "sample block").
* Header block contains column names, data types, columns of size 0. Constant columns must have corresponding values.
* It is guaranteed that method "read" returns blocks of exactly that structure.
*/
virtual Block getHeader() const = 0;
/** Read next block. /** Read next block.
* If there are no more blocks, return an empty block (for which operator `bool` returns false). * If there are no more blocks, return an empty block (for which operator `bool` returns false).
* NOTE: Only one thread can read from one instance of IBlockInputStream simultaneously. * NOTE: Only one thread can read from one instance of IBlockInputStream simultaneously.
@ -76,14 +82,6 @@ public:
*/ */
virtual String getName() const = 0; virtual String getName() const = 0;
/** The unique identifier of the pipeline part of the query execution.
* Sources with the same identifier are considered identical
* (producing the same data), and can be replaced by one source
* if several queries are executed simultaneously.
* If the source can not be glued together with any other - return the object's address as an identifier.
*/
virtual String getID() const;
/// If this stream generates data in grouped by some keys, return true. /// If this stream generates data in grouped by some keys, return true.
virtual bool isGroupedOutput() const { return false; } virtual bool isGroupedOutput() const { return false; }
/// If this stream generates data in order by some keys, return true. /// If this stream generates data in order by some keys, return true.

View File

@ -1,20 +1,16 @@
#pragma once #pragma once
#include <Parsers/ASTInsertQuery.h> #include <Parsers/IAST.h>
#include <Interpreters/Context.h>
#include <IO/ConcatReadBuffer.h>
#include <DataStreams/IProfilingBlockInputStream.h> #include <DataStreams/IProfilingBlockInputStream.h>
#include <DataStreams/BlockIO.h>
#include <cstddef> #include <cstddef>
#include <memory>
namespace DB namespace DB
{ {
namespace ErrorCodes struct BlockIO;
{ class Context;
extern const int LOGICAL_ERROR;
}
/** Prepares an input stream which produce data containing in INSERT query /** Prepares an input stream which produce data containing in INSERT query
* Head of inserting data could be stored in INSERT ast directly * Head of inserting data could be stored in INSERT ast directly
@ -23,7 +19,6 @@ namespace ErrorCodes
class InputStreamFromASTInsertQuery : public IProfilingBlockInputStream class InputStreamFromASTInsertQuery : public IProfilingBlockInputStream
{ {
public: public:
InputStreamFromASTInsertQuery(const ASTPtr & ast, ReadBuffer & input_buffer_tail_part, const BlockIO & streams, Context & context); InputStreamFromASTInsertQuery(const ASTPtr & ast, ReadBuffer & input_buffer_tail_part, const BlockIO & streams, Context & context);
Block readImpl() override { return res_stream->read(); } Block readImpl() override { return res_stream->read(); }
@ -31,10 +26,10 @@ public:
void readSuffixImpl() override { return res_stream->readSuffix(); } void readSuffixImpl() override { return res_stream->readSuffix(); }
String getName() const override { return "InputStreamFromASTInsertQuery"; } String getName() const override { return "InputStreamFromASTInsertQuery"; }
String getID() const override { return "InputStreamFromASTInsertQuery(" + toString(std::intptr_t(this)) + ")"; }
Block getHeader() const override { return res_stream->getHeader(); }
private: private:
std::unique_ptr<ReadBuffer> input_buffer_ast_part; std::unique_ptr<ReadBuffer> input_buffer_ast_part;
std::unique_ptr<ReadBuffer> input_buffer_contacenated; std::unique_ptr<ReadBuffer> input_buffer_contacenated;

View File

@ -15,14 +15,13 @@ class LazyBlockInputStream : public IProfilingBlockInputStream
public: public:
using Generator = std::function<BlockInputStreamPtr()>; using Generator = std::function<BlockInputStreamPtr()>;
LazyBlockInputStream(Generator generator_) LazyBlockInputStream(const Block & header_, Generator generator_)
: generator(std::move(generator_)) : header(header_), generator(std::move(generator_))
{ {
} }
LazyBlockInputStream(const char * name_, Generator generator_) LazyBlockInputStream(const char * name_, const Block & header_, Generator generator_)
: name(name_) : name(name_), header(header_), generator(std::move(generator_))
, generator(std::move(generator_))
{ {
} }
@ -34,6 +33,11 @@ public:
IProfilingBlockInputStream::cancel(); IProfilingBlockInputStream::cancel();
} }
Block getHeader() const override
{
return header;
}
protected: protected:
Block readImpl() override Block readImpl() override
{ {
@ -89,6 +93,7 @@ protected:
private: private:
const char * name = "Lazy"; const char * name = "Lazy";
Block header;
Generator generator; Generator generator;
BlockInputStreamPtr input; BlockInputStreamPtr input;

View File

@ -21,12 +21,7 @@ public:
String getName() const override { return "Limit"; } String getName() const override { return "Limit"; }
String getID() const override Block getHeader() const override { return children.at(0)->getHeader(); }
{
std::stringstream res;
res << "Limit(" << children.back()->getID() << ", " << limit << ", " << offset << ")";
return res.str();
}
protected: protected:
Block readImpl() override; Block readImpl() override;

View File

@ -3,9 +3,9 @@
#include <DataStreams/IProfilingBlockInputStream.h> #include <DataStreams/IProfilingBlockInputStream.h>
#include <Common/HashTable/HashMap.h> #include <Common/HashTable/HashMap.h>
#include <Common/SipHash.h>
#include <Common/UInt128.h> #include <Common/UInt128.h>
namespace DB namespace DB
{ {
@ -22,6 +22,8 @@ public:
String getName() const override { return "LimitBy"; } String getName() const override { return "LimitBy"; }
Block getHeader() const override { return children.at(0)->getHeader(); }
protected: protected:
Block readImpl() override; Block readImpl() override;

View File

@ -15,11 +15,9 @@ String MaterializingBlockInputStream::getName() const
return "Materializing"; return "Materializing";
} }
String MaterializingBlockInputStream::getID() const Block MaterializingBlockInputStream::getHeader() const
{ {
std::stringstream res; return materializeBlock(children.back()->getHeader());
res << "Materializing(" << children.back()->getID() << ")";
return res.str();
} }
Block MaterializingBlockInputStream::readImpl() Block MaterializingBlockInputStream::readImpl()

View File

@ -12,7 +12,7 @@ class MaterializingBlockInputStream : public IProfilingBlockInputStream
public: public:
MaterializingBlockInputStream(const BlockInputStreamPtr & input); MaterializingBlockInputStream(const BlockInputStreamPtr & input);
String getName() const override; String getName() const override;
String getID() const override; Block getHeader() const override;
protected: protected:
Block readImpl() override; Block readImpl() override;

View File

@ -155,7 +155,7 @@ Block MergeSortingBlockInputStream::readImpl()
MergeSortingBlocksBlockInputStream::MergeSortingBlocksBlockInputStream( MergeSortingBlocksBlockInputStream::MergeSortingBlocksBlockInputStream(
Blocks & blocks_, SortDescription & description_, size_t max_merged_block_size_, size_t limit_) Blocks & blocks_, SortDescription & description_, size_t max_merged_block_size_, size_t limit_)
: blocks(blocks_), description(description_), max_merged_block_size(max_merged_block_size_), limit(limit_) : blocks(blocks_), header(blocks.at(0).cloneEmpty()), description(description_), max_merged_block_size(max_merged_block_size_), limit(limit_)
{ {
Blocks nonempty_blocks; Blocks nonempty_blocks;
for (const auto & block : blocks) for (const auto & block : blocks)

View File

@ -33,17 +33,19 @@ public:
size_t max_merged_block_size_, size_t limit_ = 0); size_t max_merged_block_size_, size_t limit_ = 0);
String getName() const override { return "MergeSortingBlocks"; } String getName() const override { return "MergeSortingBlocks"; }
String getID() const override { return getName(); }
bool isGroupedOutput() const override { return true; } bool isGroupedOutput() const override { return true; }
bool isSortedOutput() const override { return true; } bool isSortedOutput() const override { return true; }
const SortDescription & getSortDescription() const override { return description; } const SortDescription & getSortDescription() const override { return description; }
Block getHeader() const override { return header; }
protected: protected:
Block readImpl() override; Block readImpl() override;
private: private:
Blocks & blocks; Blocks & blocks;
Block header;
SortDescription description; SortDescription description;
size_t max_merged_block_size; size_t max_merged_block_size;
size_t limit; size_t limit;
@ -80,22 +82,12 @@ public:
String getName() const override { return "MergeSorting"; } String getName() const override { return "MergeSorting"; }
String getID() const override
{
std::stringstream res;
res << "MergeSorting(" << children.back()->getID();
for (size_t i = 0; i < description.size(); ++i)
res << ", " << description[i].getID();
res << ")";
return res.str();
}
bool isGroupedOutput() const override { return true; } bool isGroupedOutput() const override { return true; }
bool isSortedOutput() const override { return true; } bool isSortedOutput() const override { return true; }
const SortDescription & getSortDescription() const override { return description; } const SortDescription & getSortDescription() const override { return description; }
Block getHeader() const override { return children.at(0)->getHeader(); }
protected: protected:
Block readImpl() override; Block readImpl() override;
@ -129,7 +121,7 @@ private:
BlockInputStreamPtr block_in; BlockInputStreamPtr block_in;
TemporaryFileStream(const std::string & path) TemporaryFileStream(const std::string & path)
: file_in(path), compressed_in(file_in), block_in(std::make_shared<NativeBlockInputStream>(compressed_in)) {} : file_in(path), compressed_in(file_in), block_in(std::make_shared<NativeBlockInputStream>(compressed_in, 0)) {}
}; };
std::vector<std::unique_ptr<TemporaryFileStream>> temporary_inputs; std::vector<std::unique_ptr<TemporaryFileStream>> temporary_inputs;

View File

@ -6,6 +6,11 @@
namespace DB namespace DB
{ {
Block MergingAggregatedBlockInputStream::getHeader() const
{
return aggregator.getHeader(final);
}
Block MergingAggregatedBlockInputStream::readImpl() Block MergingAggregatedBlockInputStream::readImpl()
{ {

View File

@ -22,12 +22,7 @@ public:
String getName() const override { return "MergingAggregated"; } String getName() const override { return "MergingAggregated"; }
String getID() const override Block getHeader() const override;
{
std::stringstream res;
res << "MergingAggregated(" << children.back()->getID() << ", " << aggregator.getID() << ")";
return res.str();
}
protected: protected:
Block readImpl() override; Block readImpl() override;

View File

@ -90,14 +90,9 @@ MergingAggregatedMemoryEfficientBlockInputStream::MergingAggregatedMemoryEfficie
} }
String MergingAggregatedMemoryEfficientBlockInputStream::getID() const Block MergingAggregatedMemoryEfficientBlockInputStream::getHeader() const
{ {
std::stringstream res; return aggregator.getHeader(final);
res << "MergingAggregatedMemoryEfficient(" << aggregator.getID();
for (size_t i = 0, size = children.size(); i < size; ++i)
res << ", " << children.back()->getID();
res << ")";
return res.str();
} }

View File

@ -67,8 +67,6 @@ public:
String getName() const override { return "MergingAggregatedMemoryEfficient"; } String getName() const override { return "MergingAggregatedMemoryEfficient"; }
String getID() const override;
/// Sends the request (initiates calculations) earlier than `read`. /// Sends the request (initiates calculations) earlier than `read`.
void readPrefix() override; void readPrefix() override;
@ -80,6 +78,8 @@ public:
*/ */
void cancel() override; void cancel() override;
Block getHeader() const override;
protected: protected:
Block readImpl() override; Block readImpl() override;

View File

@ -24,28 +24,6 @@ MergingSortedBlockInputStream::MergingSortedBlockInputStream(
children.insert(children.end(), inputs_.begin(), inputs_.end()); children.insert(children.end(), inputs_.begin(), inputs_.end());
} }
String MergingSortedBlockInputStream::getID() const
{
std::stringstream res;
res << "MergingSorted(";
Strings children_ids(children.size());
for (size_t i = 0; i < children.size(); ++i)
children_ids[i] = children[i]->getID();
/// The order does not matter.
std::sort(children_ids.begin(), children_ids.end());
for (size_t i = 0; i < children_ids.size(); ++i)
res << (i == 0 ? "" : ", ") << children_ids[i];
for (size_t i = 0; i < description.size(); ++i)
res << ", " << description[i].getID();
res << ")";
return res.str();
}
void MergingSortedBlockInputStream::init(Block & header, MutableColumns & merged_columns) void MergingSortedBlockInputStream::init(Block & header, MutableColumns & merged_columns)
{ {
/// Read the first blocks, initialize the queue. /// Read the first blocks, initialize the queue.
@ -53,10 +31,9 @@ void MergingSortedBlockInputStream::init(Block & header, MutableColumns & merged
{ {
first = false; first = false;
size_t i = 0; for (size_t i = 0; i < source_blocks.size(); ++i)
for (auto it = source_blocks.begin(); it != source_blocks.end(); ++it, ++i)
{ {
SharedBlockPtr & shared_block_ptr = *it; SharedBlockPtr & shared_block_ptr = source_blocks[i];
if (shared_block_ptr.get()) if (shared_block_ptr.get())
continue; continue;
@ -75,6 +52,8 @@ void MergingSortedBlockInputStream::init(Block & header, MutableColumns & merged
expected_block_size = std::min(rows, max_block_size); expected_block_size = std::min(rows, max_block_size);
cursors[i] = SortCursorImpl(*shared_block_ptr, description, i); cursors[i] = SortCursorImpl(*shared_block_ptr, description, i);
shared_block_ptr->all_columns = cursors[i].all_columns;
shared_block_ptr->sort_columns = cursors[i].sort_columns;
has_collation |= cursors[i].has_collation; has_collation |= cursors[i].has_collation;
} }
@ -173,25 +152,20 @@ Block MergingSortedBlockInputStream::readImpl()
template <typename TSortCursor> template <typename TSortCursor>
void MergingSortedBlockInputStream::fetchNextBlock(const TSortCursor & current, std::priority_queue<TSortCursor> & queue) void MergingSortedBlockInputStream::fetchNextBlock(const TSortCursor & current, std::priority_queue<TSortCursor> & queue)
{ {
size_t i = 0; size_t order = current.impl->order;
size_t size = cursors.size(); size_t size = cursors.size();
for (; i < size; ++i)
{
if (&cursors[i] == current.impl)
{
source_blocks[i] = new detail::SharedBlock(children[i]->read());
if (*source_blocks[i])
{
cursors[i].reset(*source_blocks[i]);
queue.push(TSortCursor(&cursors[i]));
}
break; if (order >= size || &cursors[order] != current.impl)
}
}
if (i == size)
throw Exception("Logical error in MergingSortedBlockInputStream", ErrorCodes::LOGICAL_ERROR); throw Exception("Logical error in MergingSortedBlockInputStream", ErrorCodes::LOGICAL_ERROR);
source_blocks[order] = new detail::SharedBlock(children[order]->read());
if (*source_blocks[order])
{
cursors[order].reset(*source_blocks[order]);
queue.push(TSortCursor(&cursors[order]));
source_blocks[order]->all_columns = cursors[order].all_columns;
source_blocks[order]->sort_columns = cursors[order].sort_columns;
}
} }
template template

View File

@ -30,13 +30,15 @@ namespace ErrorCodes
/// The reference counter is not atomic, since it is used from one thread. /// The reference counter is not atomic, since it is used from one thread.
namespace detail namespace detail
{ {
struct SharedBlock : Block struct SharedBlock : Block
{ {
int refcount = 0; int refcount = 0;
SharedBlock(Block && value_) ColumnRawPtrs all_columns;
: Block(std::move(value_)) {}; ColumnRawPtrs sort_columns;
};
SharedBlock(Block && block) : Block(std::move(block)) {}
};
} }
using SharedBlockPtr = boost::intrusive_ptr<detail::SharedBlock>; using SharedBlockPtr = boost::intrusive_ptr<detail::SharedBlock>;
@ -68,16 +70,16 @@ public:
String getName() const override { return "MergingSorted"; } String getName() const override { return "MergingSorted"; }
String getID() const override;
bool isGroupedOutput() const override { return true; } bool isGroupedOutput() const override { return true; }
bool isSortedOutput() const override { return true; } bool isSortedOutput() const override { return true; }
const SortDescription & getSortDescription() const override { return description; } const SortDescription & getSortDescription() const override { return description; }
Block getHeader() const override { return children.at(0)->getHeader(); }
protected: protected:
struct RowRef struct RowRef
{ {
ColumnRawPtrs columns; ColumnRawPtrs * columns = nullptr;
size_t row_num; size_t row_num;
SharedBlockPtr shared_block; SharedBlockPtr shared_block;
@ -91,9 +93,9 @@ protected:
/// The number and types of columns must match. /// The number and types of columns must match.
bool operator==(const RowRef & other) const bool operator==(const RowRef & other) const
{ {
size_t size = columns.size(); size_t size = columns->size();
for (size_t i = 0; i < size; ++i) for (size_t i = 0; i < size; ++i)
if (0 != columns[i]->compareAt(row_num, other.row_num, *other.columns[i], 1)) if (0 != (*columns)[i]->compareAt(row_num, other.row_num, *(*other.columns)[i], 1))
return false; return false;
return true; return true;
} }
@ -103,8 +105,8 @@ protected:
return !(*this == other); return !(*this == other);
} }
bool empty() const { return columns.empty(); } bool empty() const { return columns == nullptr; }
size_t size() const { return columns.size(); } size_t size() const { return empty() ? 0 : columns->size(); }
}; };
@ -190,9 +192,7 @@ protected:
{ {
row_ref.row_num = cursor.impl->pos; row_ref.row_num = cursor.impl->pos;
row_ref.shared_block = source_blocks[cursor.impl->order]; row_ref.shared_block = source_blocks[cursor.impl->order];
row_ref.columns = &row_ref.shared_block->all_columns;
for (size_t i = 0; i < num_columns; ++i)
row_ref.columns[i] = cursor->all_columns[i];
} }
template <typename TSortCursor> template <typename TSortCursor>
@ -200,9 +200,7 @@ protected:
{ {
row_ref.row_num = cursor.impl->pos; row_ref.row_num = cursor.impl->pos;
row_ref.shared_block = source_blocks[cursor.impl->order]; row_ref.shared_block = source_blocks[cursor.impl->order];
row_ref.columns = &row_ref.shared_block->sort_columns;
for (size_t i = 0; i < cursor->sort_columns_size; ++i)
row_ref.columns[i] = cursor->sort_columns[i];
} }
private: private:

View File

@ -19,23 +19,40 @@ namespace ErrorCodes
extern const int INCORRECT_INDEX; extern const int INCORRECT_INDEX;
extern const int LOGICAL_ERROR; extern const int LOGICAL_ERROR;
extern const int CANNOT_READ_ALL_DATA; extern const int CANNOT_READ_ALL_DATA;
extern const int NOT_IMPLEMENTED;
} }
NativeBlockInputStream::NativeBlockInputStream(
ReadBuffer & istr_, UInt64 server_revision_, NativeBlockInputStream::NativeBlockInputStream(ReadBuffer & istr_, UInt64 server_revision_)
bool use_index_, : istr(istr_), server_revision(server_revision_)
{
}
NativeBlockInputStream::NativeBlockInputStream(ReadBuffer & istr_, const Block & header_, UInt64 server_revision_)
: istr(istr_), header(header_), server_revision(server_revision_)
{
}
NativeBlockInputStream::NativeBlockInputStream(ReadBuffer & istr_, UInt64 server_revision_,
IndexForNativeFormat::Blocks::const_iterator index_block_it_, IndexForNativeFormat::Blocks::const_iterator index_block_it_,
IndexForNativeFormat::Blocks::const_iterator index_block_end_) IndexForNativeFormat::Blocks::const_iterator index_block_end_)
: istr(istr_), server_revision(server_revision_), : istr(istr_), server_revision(server_revision_),
use_index(use_index_), index_block_it(index_block_it_), index_block_end(index_block_end_) use_index(true), index_block_it(index_block_it_), index_block_end(index_block_end_)
{ {
if (use_index)
{
istr_concrete = typeid_cast<CompressedReadBufferFromFile *>(&istr); istr_concrete = typeid_cast<CompressedReadBufferFromFile *>(&istr);
if (!istr_concrete) if (!istr_concrete)
throw Exception("When need to use index for NativeBlockInputStream, istr must be CompressedReadBufferFromFile.", ErrorCodes::LOGICAL_ERROR); throw Exception("When need to use index for NativeBlockInputStream, istr must be CompressedReadBufferFromFile.", ErrorCodes::LOGICAL_ERROR);
if (index_block_it == index_block_end)
return;
index_column_it = index_block_it->columns.begin(); index_column_it = index_block_it->columns.begin();
/// Initialize header from the index.
for (const auto & column : index_block_it->columns)
{
auto type = DataTypeFactory::instance().get(column.type);
header.insert({ type->createColumn(), type, column.name });
} }
} }
@ -50,6 +67,12 @@ void NativeBlockInputStream::readData(const IDataType & type, IColumn & column,
} }
Block NativeBlockInputStream::getHeader() const
{
return header;
}
Block NativeBlockInputStream::readImpl() Block NativeBlockInputStream::readImpl()
{ {
Block res; Block res;

View File

@ -60,35 +60,33 @@ struct IndexForNativeFormat
class NativeBlockInputStream : public IProfilingBlockInputStream class NativeBlockInputStream : public IProfilingBlockInputStream
{ {
public: public:
/** If a non-zero server_revision is specified, additional block information may be expected and read. /// If a non-zero server_revision is specified, additional block information may be expected and read.
* NativeBlockInputStream(ReadBuffer & istr_, UInt64 server_revision_);
* `index` is not required parameter. If set, only parts of columns specified in the index will be read.
*/ /// For cases when data structure (header) is known in advance.
NativeBlockInputStream( /// NOTE We may use header for data validation and/or type conversions. It is not implemented.
ReadBuffer & istr_, UInt64 server_revision_ = 0, NativeBlockInputStream(ReadBuffer & istr_, const Block & header_, UInt64 server_revision_);
bool use_index_ = false,
IndexForNativeFormat::Blocks::const_iterator index_block_it_ = IndexForNativeFormat::Blocks::const_iterator{}, /// For cases when we have an index. It allows to skip columns. Only columns specified in the index will be read.
IndexForNativeFormat::Blocks::const_iterator index_block_end_ = IndexForNativeFormat::Blocks::const_iterator{}); NativeBlockInputStream(ReadBuffer & istr_, UInt64 server_revision_,
IndexForNativeFormat::Blocks::const_iterator index_block_it_,
IndexForNativeFormat::Blocks::const_iterator index_block_end_);
String getName() const override { return "Native"; } String getName() const override { return "Native"; }
String getID() const override
{
std::stringstream res;
res << this;
return res.str();
}
static void readData(const IDataType & type, IColumn & column, ReadBuffer & istr, size_t rows, double avg_value_size_hint); static void readData(const IDataType & type, IColumn & column, ReadBuffer & istr, size_t rows, double avg_value_size_hint);
Block getHeader() const override;
protected: protected:
Block readImpl() override; Block readImpl() override;
private: private:
ReadBuffer & istr; ReadBuffer & istr;
Block header;
UInt64 server_revision; UInt64 server_revision;
bool use_index; bool use_index = false;
IndexForNativeFormat::Blocks::const_iterator index_block_it; IndexForNativeFormat::Blocks::const_iterator index_block_it;
IndexForNativeFormat::Blocks::const_iterator index_block_end; IndexForNativeFormat::Blocks::const_iterator index_block_end;
IndexOfBlockForNativeFormat::Columns::const_iterator index_column_it; IndexOfBlockForNativeFormat::Columns::const_iterator index_column_it;

View File

@ -28,12 +28,7 @@ public:
String getName() const override { return "NullAndDoCopy"; } String getName() const override { return "NullAndDoCopy"; }
String getID() const override Block getHeader() const override { return {}; }
{
std::stringstream res;
res << "copy from " << input->getID();
return res.str();
}
protected: protected:
Block readImpl() override Block readImpl() override

View File

@ -6,14 +6,19 @@
namespace DB namespace DB
{ {
/** Empty stream of blocks. /** Empty stream of blocks of specified structure.
*/ */
class NullBlockInputStream : public IBlockInputStream class NullBlockInputStream : public IBlockInputStream
{ {
public: public:
Block read() override { return Block(); } NullBlockInputStream(const Block & header) : header(header) {}
Block read() override { return {}; }
Block getHeader() const override { return header; }
String getName() const override { return "Null"; } String getName() const override { return "Null"; }
private:
Block header;
}; };
} }

View File

@ -17,18 +17,12 @@ namespace ErrorCodes
NullableAdapterBlockInputStream::NullableAdapterBlockInputStream( NullableAdapterBlockInputStream::NullableAdapterBlockInputStream(
const BlockInputStreamPtr & input, const BlockInputStreamPtr & input,
const Block & in_sample_, const Block & out_sample_) const Block & in_sample_, const Block & out_sample_)
: header(out_sample_)
{ {
buildActions(in_sample_, out_sample_); buildActions(in_sample_, out_sample_);
children.push_back(input); children.push_back(input);
} }
String NullableAdapterBlockInputStream::getID() const
{
std::stringstream res;
res << "NullableAdapterBlockInputStream(" << children.back()->getID() << ")";
return res.str();
}
Block NullableAdapterBlockInputStream::readImpl() Block NullableAdapterBlockInputStream::readImpl()
{ {
Block block = children.back()->read(); Block block = children.back()->read();

View File

@ -18,12 +18,11 @@ namespace DB
class NullableAdapterBlockInputStream : public IProfilingBlockInputStream class NullableAdapterBlockInputStream : public IProfilingBlockInputStream
{ {
public: public:
NullableAdapterBlockInputStream(const BlockInputStreamPtr & input, const Block & in_sample_, NullableAdapterBlockInputStream(const BlockInputStreamPtr & input, const Block & in_sample_, const Block & out_sample_);
const Block & out_sample_);
String getName() const override { return "NullableAdapterBlockInputStream"; } String getName() const override { return "NullableAdapterBlockInputStream"; }
String getID() const override; Block getHeader() const override { return header; }
protected: protected:
Block readImpl() override; Block readImpl() override;
@ -52,6 +51,7 @@ private:
void buildActions(const Block & in_sample, const Block & out_sample); void buildActions(const Block & in_sample, const Block & out_sample);
private: private:
Block header;
Actions actions; Actions actions;
std::vector<std::optional<String>> rename; std::vector<std::optional<String>> rename;
bool must_transform = false; bool must_transform = false;

View File

@ -16,6 +16,14 @@ public:
String getName() const override { return "One"; } String getName() const override { return "One"; }
Block getHeader() const override
{
Block res;
for (const auto & elem : block)
res.insert({ elem.column->cloneEmpty(), elem.type, elem.name });
return res;
}
protected: protected:
Block readImpl() override Block readImpl() override
{ {

View File

@ -20,13 +20,13 @@ public:
children.push_back(stream); children.push_back(stream);
} }
Block getHeader() const override { return children.at(0)->getHeader(); }
private: private:
Block readImpl() override { return stream->read(); } Block readImpl() override { return stream->read(); }
String getName() const override { return "Owning"; } String getName() const override { return "Owning"; }
String getID() const override { return "Owning(" + stream->getID() + ")"; }
protected: protected:
BlockInputStreamPtr stream; BlockInputStreamPtr stream;
std::unique_ptr<OwnType> own; std::unique_ptr<OwnType> own;

View File

@ -29,23 +29,9 @@ ParallelAggregatingBlockInputStream::ParallelAggregatingBlockInputStream(
} }
String ParallelAggregatingBlockInputStream::getID() const Block ParallelAggregatingBlockInputStream::getHeader() const
{ {
std::stringstream res; return aggregator.getHeader(final);
res << "ParallelAggregating(";
Strings children_ids(children.size());
for (size_t i = 0; i < children.size(); ++i)
children_ids[i] = children[i]->getID();
/// Order does not matter.
std::sort(children_ids.begin(), children_ids.end());
for (size_t i = 0; i < children_ids.size(); ++i)
res << (i == 0 ? "" : ", ") << children_ids[i];
res << ", " << aggregator.getID() << ")";
return res.str();
} }
@ -122,8 +108,7 @@ void ParallelAggregatingBlockInputStream::Handler::onBlock(Block & block, size_t
{ {
parent.aggregator.executeOnBlock(block, *parent.many_data[thread_num], parent.aggregator.executeOnBlock(block, *parent.many_data[thread_num],
parent.threads_data[thread_num].key_columns, parent.threads_data[thread_num].aggregate_columns, parent.threads_data[thread_num].key_columns, parent.threads_data[thread_num].aggregate_columns,
parent.threads_data[thread_num].key_sizes, parent.threads_data[thread_num].key, parent.threads_data[thread_num].key, parent.no_more_keys);
parent.no_more_keys);
parent.threads_data[thread_num].src_rows += block.rows(); parent.threads_data[thread_num].src_rows += block.rows();
parent.threads_data[thread_num].src_bytes += block.bytes(); parent.threads_data[thread_num].src_bytes += block.bytes();
@ -212,6 +197,13 @@ void ParallelAggregatingBlockInputStream::execute()
<< "Total aggregated. " << total_src_rows << " rows (from " << total_src_bytes / 1048576.0 << " MiB)" << "Total aggregated. " << total_src_rows << " rows (from " << total_src_bytes / 1048576.0 << " MiB)"
<< " in " << elapsed_seconds << " sec." << " in " << elapsed_seconds << " sec."
<< " (" << total_src_rows / elapsed_seconds << " rows/sec., " << total_src_bytes / elapsed_seconds / 1048576.0 << " MiB/sec.)"); << " (" << total_src_rows / elapsed_seconds << " rows/sec., " << total_src_bytes / elapsed_seconds / 1048576.0 << " MiB/sec.)");
/// If there was no data, and we aggregate without keys, we must return single row with the result of empty aggregation.
/// To do this, we pass a block with zero rows to aggregate.
if (total_src_rows == 0 && params.keys_size == 0 && !params.empty_result_for_aggregation_by_empty_set)
aggregator.executeOnBlock(children.at(0)->getHeader(), *many_data[0],
threads_data[0].key_columns, threads_data[0].aggregate_columns,
threads_data[0].key, no_more_keys);
} }
} }

View File

@ -27,10 +27,10 @@ public:
String getName() const override { return "ParallelAggregating"; } String getName() const override { return "ParallelAggregating"; }
String getID() const override;
void cancel() override; void cancel() override;
Block getHeader() const override;
protected: protected:
/// Do nothing that preparation to execution of the query be done in parallel, in ParallelInputsProcessor. /// Do nothing that preparation to execution of the query be done in parallel, in ParallelInputsProcessor.
void readPrefix() override void readPrefix() override
@ -83,14 +83,12 @@ private:
StringRefs key; StringRefs key;
ColumnRawPtrs key_columns; ColumnRawPtrs key_columns;
Aggregator::AggregateColumns aggregate_columns; Aggregator::AggregateColumns aggregate_columns;
Sizes key_sizes;
ThreadData(size_t keys_size, size_t aggregates_size) ThreadData(size_t keys_size, size_t aggregates_size)
{ {
key.resize(keys_size); key.resize(keys_size);
key_columns.resize(keys_size); key_columns.resize(keys_size);
aggregate_columns.resize(aggregates_size); aggregate_columns.resize(aggregates_size);
key_sizes.resize(keys_size);
} }
}; };

View File

@ -23,22 +23,12 @@ public:
String getName() const override { return "PartialSorting"; } String getName() const override { return "PartialSorting"; }
String getID() const override
{
std::stringstream res;
res << "PartialSorting(" << children.back()->getID();
for (size_t i = 0; i < description.size(); ++i)
res << ", " << description[i].getID();
res << ")";
return res.str();
}
bool isGroupedOutput() const override { return true; } bool isGroupedOutput() const override { return true; }
bool isSortedOutput() const override { return true; } bool isSortedOutput() const override { return true; }
const SortDescription & getSortDescription() const override { return description; } const SortDescription & getSortDescription() const override { return description; }
Block getHeader() const override { return children.at(0)->getHeader(); }
protected: protected:
Block readImpl() override; Block readImpl() override;

View File

@ -2,6 +2,7 @@
#include <DataStreams/OneBlockInputStream.h> #include <DataStreams/OneBlockInputStream.h>
#include <Common/NetException.h> #include <Common/NetException.h>
#include <Interpreters/Context.h> #include <Interpreters/Context.h>
#include <Interpreters/castColumn.h>
#include <Storages/IStorage.h> #include <Storages/IStorage.h>
@ -17,9 +18,9 @@ namespace ErrorCodes
RemoteBlockInputStream::RemoteBlockInputStream( RemoteBlockInputStream::RemoteBlockInputStream(
Connection & connection, Connection & connection,
const String & query_, const Context & context_, const Settings * settings, const String & query_, const Block & header_, const Context & context_, const Settings * settings,
const ThrottlerPtr & throttler, const Tables & external_tables_, QueryProcessingStage::Enum stage_) const ThrottlerPtr & throttler, const Tables & external_tables_, QueryProcessingStage::Enum stage_)
: query(query_), context(context_), external_tables(external_tables_), stage(stage_) : header(header_), query(query_), context(context_), external_tables(external_tables_), stage(stage_)
{ {
if (settings) if (settings)
context.setSettings(*settings); context.setSettings(*settings);
@ -32,9 +33,9 @@ RemoteBlockInputStream::RemoteBlockInputStream(
RemoteBlockInputStream::RemoteBlockInputStream( RemoteBlockInputStream::RemoteBlockInputStream(
std::vector<IConnectionPool::Entry> && connections, std::vector<IConnectionPool::Entry> && connections,
const String & query_, const Context & context_, const Settings * settings, const String & query_, const Block & header_, const Context & context_, const Settings * settings,
const ThrottlerPtr & throttler, const Tables & external_tables_, QueryProcessingStage::Enum stage_) const ThrottlerPtr & throttler, const Tables & external_tables_, QueryProcessingStage::Enum stage_)
: query(query_), context(context_), external_tables(external_tables_), stage(stage_) : header(header_), query(query_), context(context_), external_tables(external_tables_), stage(stage_)
{ {
if (settings) if (settings)
context.setSettings(*settings); context.setSettings(*settings);
@ -48,9 +49,9 @@ RemoteBlockInputStream::RemoteBlockInputStream(
RemoteBlockInputStream::RemoteBlockInputStream( RemoteBlockInputStream::RemoteBlockInputStream(
const ConnectionPoolWithFailoverPtr & pool, const ConnectionPoolWithFailoverPtr & pool,
const String & query_, const Context & context_, const Settings * settings, const String & query_, const Block & header_, const Context & context_, const Settings * settings,
const ThrottlerPtr & throttler, const Tables & external_tables_, QueryProcessingStage::Enum stage_) const ThrottlerPtr & throttler, const Tables & external_tables_, QueryProcessingStage::Enum stage_)
: query(query_), context(context_), external_tables(external_tables_), stage(stage_) : header(header_), query(query_), context(context_), external_tables(external_tables_), stage(stage_)
{ {
if (settings) if (settings)
context.setSettings(*settings); context.setSettings(*settings);
@ -148,6 +149,25 @@ void RemoteBlockInputStream::sendExternalTables()
multiplexed_connections->sendExternalTablesData(external_tables_data); multiplexed_connections->sendExternalTablesData(external_tables_data);
} }
/** If we receive a block with slightly different column types, or with excessive columns,
* we will adapt it to expected structure.
*/
static Block adaptBlockStructure(const Block & block, const Block & header, const Context & context)
{
/// Special case when reader doesn't care about result structure. Deprecated and used only in Benchmark, PerformanceTest.
if (!header)
return block;
Block res;
res.info = block.info;
for (const auto & elem : header)
res.insert({ castColumn(block.getByName(elem.name), elem.type, context), elem.type, elem.name });
return res;
}
Block RemoteBlockInputStream::readImpl() Block RemoteBlockInputStream::readImpl()
{ {
if (!sent_query) if (!sent_query)
@ -170,7 +190,7 @@ Block RemoteBlockInputStream::readImpl()
case Protocol::Server::Data: case Protocol::Server::Data:
/// If the block is not empty and is not a header block /// If the block is not empty and is not a header block
if (packet.block && (packet.block.rows() > 0)) if (packet.block && (packet.block.rows() > 0))
return packet.block; return adaptBlockStructure(packet.block, header, context);
break; /// If the block is empty - we will receive other packets before EndOfStream. break; /// If the block is empty - we will receive other packets before EndOfStream.
case Protocol::Server::Exception: case Protocol::Server::Exception:

View File

@ -24,7 +24,7 @@ public:
/// If `settings` is nullptr, settings will be taken from context. /// If `settings` is nullptr, settings will be taken from context.
RemoteBlockInputStream( RemoteBlockInputStream(
Connection & connection, Connection & connection,
const String & query_, const Context & context_, const Settings * settings = nullptr, const String & query_, const Block & header_, const Context & context_, const Settings * settings = nullptr,
const ThrottlerPtr & throttler = nullptr, const Tables & external_tables_ = Tables(), const ThrottlerPtr & throttler = nullptr, const Tables & external_tables_ = Tables(),
QueryProcessingStage::Enum stage_ = QueryProcessingStage::Complete); QueryProcessingStage::Enum stage_ = QueryProcessingStage::Complete);
@ -32,7 +32,7 @@ public:
/// If `settings` is nullptr, settings will be taken from context. /// If `settings` is nullptr, settings will be taken from context.
RemoteBlockInputStream( RemoteBlockInputStream(
std::vector<IConnectionPool::Entry> && connections, std::vector<IConnectionPool::Entry> && connections,
const String & query_, const Context & context_, const Settings * settings = nullptr, const String & query_, const Block & header_, const Context & context_, const Settings * settings = nullptr,
const ThrottlerPtr & throttler = nullptr, const Tables & external_tables_ = Tables(), const ThrottlerPtr & throttler = nullptr, const Tables & external_tables_ = Tables(),
QueryProcessingStage::Enum stage_ = QueryProcessingStage::Complete); QueryProcessingStage::Enum stage_ = QueryProcessingStage::Complete);
@ -40,7 +40,7 @@ public:
/// If `settings` is nullptr, settings will be taken from context. /// If `settings` is nullptr, settings will be taken from context.
RemoteBlockInputStream( RemoteBlockInputStream(
const ConnectionPoolWithFailoverPtr & pool, const ConnectionPoolWithFailoverPtr & pool,
const String & query_, const Context & context_, const Settings * settings = nullptr, const String & query_, const Block & header_, const Context & context_, const Settings * settings = nullptr,
const ThrottlerPtr & throttler = nullptr, const Tables & external_tables_ = Tables(), const ThrottlerPtr & throttler = nullptr, const Tables & external_tables_ = Tables(),
QueryProcessingStage::Enum stage_ = QueryProcessingStage::Complete); QueryProcessingStage::Enum stage_ = QueryProcessingStage::Complete);
@ -66,18 +66,13 @@ public:
String getName() const override { return "Remote"; } String getName() const override { return "Remote"; }
String getID() const override
{
std::stringstream res;
res << this;
return res.str();
}
BlockExtraInfo getBlockExtraInfo() const override BlockExtraInfo getBlockExtraInfo() const override
{ {
return multiplexed_connections->getBlockExtraInfo(); return multiplexed_connections->getBlockExtraInfo();
} }
Block getHeader() const override { return header; }
protected: protected:
/// Send all temporary tables to remote servers /// Send all temporary tables to remote servers
void sendExternalTables(); void sendExternalTables();
@ -95,10 +90,14 @@ protected:
private: private:
void sendQuery(); void sendQuery();
Block receiveBlock();
/// If wasn't sent yet, send request to cancell all connections to replicas /// If wasn't sent yet, send request to cancell all connections to replicas
void tryCancel(const char * reason); void tryCancel(const char * reason);
private: private:
Block header;
std::function<std::unique_ptr<MultiplexedConnections>()> create_multiplexed_connections; std::function<std::unique_ptr<MultiplexedConnections>()> create_multiplexed_connections;
std::unique_ptr<MultiplexedConnections> multiplexed_connections; std::unique_ptr<MultiplexedConnections> multiplexed_connections;

View File

@ -22,16 +22,15 @@ public:
String getName() const override { return "RemoveColumns"; } String getName() const override { return "RemoveColumns"; }
String getID() const override Block getHeader() const override
{ {
std::stringstream res; Block res = children.back()->getHeader();
res << "RemoveColumns(" << children.back()->getID();
for (const auto & it : columns_to_remove) for (const auto & it : columns_to_remove)
res << ", " << it; if (res.has(it))
res.erase(it);
res << ")"; return res;
return res.str();
} }
protected: protected:

View File

@ -26,7 +26,7 @@ void ReplacingSortedBlockInputStream::insertRow(MutableColumns & merged_columns,
++merged_rows; ++merged_rows;
for (size_t i = 0; i < num_columns; ++i) for (size_t i = 0; i < num_columns; ++i)
merged_columns[i]->insertFrom(*selected_row.columns[i], selected_row.row_num); merged_columns[i]->insertFrom(*(*selected_row.columns)[i], selected_row.row_num);
} }
@ -52,8 +52,6 @@ Block ReplacingSortedBlockInputStream::readImpl()
/// Additional initialization. /// Additional initialization.
if (selected_row.empty()) if (selected_row.empty())
{ {
selected_row.columns.resize(num_columns);
if (!version_column.empty()) if (!version_column.empty())
version_column_number = header.getPositionByName(version_column); version_column_number = header.getPositionByName(version_column);
} }
@ -73,12 +71,7 @@ void ReplacingSortedBlockInputStream::merge(MutableColumns & merged_columns, std
SortCursor current = queue.top(); SortCursor current = queue.top();
if (current_key.empty()) if (current_key.empty())
{
current_key.columns.resize(description.size());
next_key.columns.resize(description.size());
setPrimaryKeyRef(current_key, current); setPrimaryKeyRef(current_key, current);
}
UInt64 version = version_column_number != -1 UInt64 version = version_column_number != -1
? current->all_columns[version_column_number]->get64(current->pos) ? current->all_columns[version_column_number]->get64(current->pos)

View File

@ -24,23 +24,6 @@ public:
String getName() const override { return "ReplacingSorted"; } String getName() const override { return "ReplacingSorted"; }
String getID() const override
{
std::stringstream res;
res << "ReplacingSorted(inputs";
for (size_t i = 0; i < children.size(); ++i)
res << ", " << children[i]->getID();
res << ", description";
for (size_t i = 0; i < description.size(); ++i)
res << ", " << description[i].getID();
res << ", version_column, " << version_column << ")";
return res.str();
}
protected: protected:
/// Can return 1 more records than max_block_size. /// Can return 1 more records than max_block_size.
Block readImpl() override; Block readImpl() override;

View File

@ -16,12 +16,7 @@ public:
String getName() const override { return "Squashing"; } String getName() const override { return "Squashing"; }
String getID() const override Block getHeader() const override { return children.at(0)->getHeader(); }
{
std::stringstream res;
res << "Squashing(" << children.at(0)->getID() << ")";
return res.str();
}
protected: protected:
Block readImpl() override; Block readImpl() override;

View File

@ -23,24 +23,6 @@ namespace ErrorCodes
} }
String SummingSortedBlockInputStream::getID() const
{
std::stringstream res;
res << "SummingSorted(inputs";
for (size_t i = 0; i < children.size(); ++i)
res << ", " << children[i]->getID();
res << ", description";
for (size_t i = 0; i < description.size(); ++i)
res << ", " << description[i].getID();
res << ")";
return res.str();
}
void SummingSortedBlockInputStream::insertCurrentRowIfNeeded(MutableColumns & merged_columns, bool force_insertion) void SummingSortedBlockInputStream::insertCurrentRowIfNeeded(MutableColumns & merged_columns, bool force_insertion)
{ {
for (auto & desc : columns_to_aggregate) for (auto & desc : columns_to_aggregate)
@ -131,7 +113,6 @@ Block SummingSortedBlockInputStream::readImpl()
if (current_row.empty()) if (current_row.empty())
{ {
current_row.resize(num_columns); current_row.resize(num_columns);
next_key.columns.resize(description.size());
/// name of nested structure -> the column numbers that refer to it. /// name of nested structure -> the column numbers that refer to it.
std::unordered_map<std::string, std::vector<size_t>> discovered_maps; std::unordered_map<std::string, std::vector<size_t>> discovered_maps;
@ -324,7 +305,6 @@ void SummingSortedBlockInputStream::merge(MutableColumns & merged_columns, std::
if (current_key.empty()) /// The first key encountered. if (current_key.empty()) /// The first key encountered.
{ {
current_key.columns.resize(description.size());
setPrimaryKeyRef(current_key, current); setPrimaryKeyRef(current_key, current);
key_differs = true; key_differs = true;
} }

View File

@ -35,8 +35,6 @@ public:
String getName() const override { return "SummingSorted"; } String getName() const override { return "SummingSorted"; }
String getID() const override;
protected: protected:
/// Can return 1 more records than max_block_size. /// Can return 1 more records than max_block_size.
Block readImpl() override; Block readImpl() override;

View File

@ -1,6 +1,7 @@
#include <DataStreams/TotalsHavingBlockInputStream.h> #include <DataStreams/TotalsHavingBlockInputStream.h>
#include <Interpreters/ExpressionActions.h> #include <Interpreters/ExpressionActions.h>
#include <Interpreters/AggregateDescription.h> #include <Interpreters/AggregateDescription.h>
#include <DataTypes/DataTypeAggregateFunction.h>
#include <Columns/ColumnAggregateFunction.h> #include <Columns/ColumnAggregateFunction.h>
#include <Columns/ColumnsNumber.h> #include <Columns/ColumnsNumber.h>
#include <Columns/FilterDescription.h> #include <Columns/FilterDescription.h>
@ -29,26 +30,18 @@ TotalsHavingBlockInputStream::TotalsHavingBlockInputStream(
} }
String TotalsHavingBlockInputStream::getID() const
{
std::stringstream res;
res << "TotalsHavingBlockInputStream(" << children.back()->getID()
<< "," << filter_column_name << ")";
return res.str();
}
static void finalize(Block & block) static void finalize(Block & block)
{ {
for (size_t i = 0; i < block.columns(); ++i) for (size_t i = 0; i < block.columns(); ++i)
{ {
ColumnWithTypeAndName & current = block.safeGetByPosition(i); ColumnWithTypeAndName & current = block.safeGetByPosition(i);
const ColumnAggregateFunction * unfinalized_column = typeid_cast<const ColumnAggregateFunction *>(current.column.get()); const DataTypeAggregateFunction * unfinalized_type = typeid_cast<const DataTypeAggregateFunction *>(current.type.get());
if (unfinalized_column) if (unfinalized_type)
{ {
current.type = unfinalized_column->getAggregateFunction()->getReturnType(); current.type = unfinalized_type->getReturnType();
current.column = unfinalized_column->convertToValues(); if (current.column)
current.column = typeid_cast<const ColumnAggregateFunction &>(*current.column).convertToValues();
} }
} }
} }
@ -70,7 +63,7 @@ const Block & TotalsHavingBlockInputStream::getTotals()
addToTotals(overflow_aggregates, nullptr); addToTotals(overflow_aggregates, nullptr);
} }
totals = header.cloneWithColumns(std::move(current_totals)); totals = children.at(0)->getHeader().cloneWithColumns(std::move(current_totals));
finalize(totals); finalize(totals);
} }
@ -81,6 +74,16 @@ const Block & TotalsHavingBlockInputStream::getTotals()
} }
Block TotalsHavingBlockInputStream::getHeader() const
{
Block res = children.at(0)->getHeader();
finalize(res);
if (expression)
expression->execute(res);
return res;
}
Block TotalsHavingBlockInputStream::readImpl() Block TotalsHavingBlockInputStream::readImpl()
{ {
Block finalized; Block finalized;
@ -90,9 +93,6 @@ Block TotalsHavingBlockInputStream::readImpl()
{ {
block = children[0]->read(); block = children[0]->read();
if (!header)
header = block.cloneEmpty();
/// Block with values not included in `max_rows_to_group_by`. We'll postpone it. /// Block with values not included in `max_rows_to_group_by`. We'll postpone it.
if (overflow_row && block && block.info.is_overflows) if (overflow_row && block && block.info.is_overflows)
{ {

View File

@ -19,6 +19,7 @@ private:
using ExpressionActionsPtr = std::shared_ptr<ExpressionActions>; using ExpressionActionsPtr = std::shared_ptr<ExpressionActions>;
public: public:
/// expression may be nullptr
TotalsHavingBlockInputStream( TotalsHavingBlockInputStream(
const BlockInputStreamPtr & input_, const BlockInputStreamPtr & input_,
bool overflow_row_, const ExpressionActionsPtr & expression_, bool overflow_row_, const ExpressionActionsPtr & expression_,
@ -26,10 +27,10 @@ public:
String getName() const override { return "TotalsHaving"; } String getName() const override { return "TotalsHaving"; }
String getID() const override;
const Block & getTotals() override; const Block & getTotals() override;
Block getHeader() const override;
protected: protected:
Block readImpl() override; Block readImpl() override;
@ -42,8 +43,6 @@ private:
size_t passed_keys = 0; size_t passed_keys = 0;
size_t total_keys = 0; size_t total_keys = 0;
Block header;
/** Here are the values that did not pass max_rows_to_group_by. /** Here are the values that did not pass max_rows_to_group_by.
* They are added or not added to the current_totals, depending on the totals_mode. * They are added or not added to the current_totals, depending on the totals_mode.
*/ */

View File

@ -86,26 +86,6 @@ public:
String getName() const override { return "Union"; } String getName() const override { return "Union"; }
String getID() const override
{
std::stringstream res;
res << "Union(";
Strings children_ids(children.size());
for (size_t i = 0; i < children.size(); ++i)
children_ids[i] = children[i]->getID();
/// Order does not matter.
std::sort(children_ids.begin(), children_ids.end());
for (size_t i = 0; i < children_ids.size(); ++i)
res << (i == 0 ? "" : ", ") << children_ids[i];
res << ")";
return res.str();
}
~UnionBlockInputStream() override ~UnionBlockInputStream() override
{ {
try try
@ -139,6 +119,8 @@ public:
return doGetBlockExtraInfo(); return doGetBlockExtraInfo();
} }
Block getHeader() const override { return children.at(0)->getHeader(); }
protected: protected:
void finalize() void finalize()
{ {

View File

@ -0,0 +1,185 @@
#include <Common/FieldVisitors.h>
#include <DataStreams/VersionedCollapsingSortedBlockInputStream.h>
#include <Columns/ColumnsNumber.h>
namespace DB
{
namespace ErrorCodes
{
extern const int LOGICAL_ERROR;
extern const int NOT_IMPLEMENTED;
}
inline ALWAYS_INLINE static void writeRowSourcePart(WriteBuffer & buffer, RowSourcePart row_source)
{
if constexpr (sizeof(RowSourcePart) == 1)
buffer.write(*reinterpret_cast<const char *>(&row_source));
else
buffer.write(reinterpret_cast<const char *>(&row_source), sizeof(RowSourcePart));
}
void VersionedCollapsingSortedBlockInputStream::insertGap(size_t gap_size)
{
if (out_row_sources_buf)
{
for (size_t i = 0; i < gap_size; ++i)
{
writeRowSourcePart(*out_row_sources_buf, current_row_sources.front());
current_row_sources.pop();
}
}
}
void VersionedCollapsingSortedBlockInputStream::insertRow(size_t skip_rows, const RowRef & row, MutableColumns & merged_columns)
{
const auto & columns = row.shared_block->all_columns;
for (size_t i = 0; i < num_columns; ++i)
merged_columns[i]->insertFrom(*columns[i], row.row_num);
insertGap(skip_rows);
if (out_row_sources_buf)
{
current_row_sources.front().setSkipFlag(false);
writeRowSourcePart(*out_row_sources_buf, current_row_sources.front());
current_row_sources.pop();
}
}
Block VersionedCollapsingSortedBlockInputStream::readImpl()
{
if (finished)
return {};
if (children.size() == 1)
return children[0]->read();
Block header;
MutableColumns merged_columns;
bool is_initialized = !first;
init(header, merged_columns);
if (has_collation)
throw Exception("Logical error: " + getName() + " does not support collations", ErrorCodes::NOT_IMPLEMENTED);
if (merged_columns.empty())
return {};
/// Additional initialization.
if (!is_initialized)
sign_column_number = header.getPositionByName(sign_column);
merge(merged_columns, queue);
return header.cloneWithColumns(std::move(merged_columns));
}
void VersionedCollapsingSortedBlockInputStream::merge(MutableColumns & merged_columns, std::priority_queue<SortCursor> & queue)
{
size_t merged_rows = 0;
auto update_queue = [this, & queue](SortCursor & cursor)
{
queue.pop();
if (out_row_sources_buf)
current_row_sources.emplace(cursor->order, true);
if (!cursor->isLast())
{
cursor->next();
queue.push(cursor);
}
else
{
/// We take next block from the corresponding source, if there is one.
fetchNextBlock(cursor, queue);
}
};
/// Take rows in correct order and put them into `merged_columns` until the rows no more than `max_block_size`
while (!queue.empty())
{
SortCursor current = queue.top();
RowRef next_key;
Int8 sign = static_cast<const ColumnInt8 &>(*current->all_columns[sign_column_number]).getData()[current->pos];
setPrimaryKeyRef(next_key, current);
size_t rows_to_merge = 0;
/// Each branch either updates queue or increases rows_to_merge.
if (current_keys.empty())
{
sign_in_queue = sign;
current_keys.pushBack(next_key);
update_queue(current);
}
else
{
if (current_keys.back() == next_key)
{
update_queue(current);
/// If all the rows was collapsed, we still want to give at least one block in the result.
/// If queue is empty then don't collapse two last rows.
if (sign == sign_in_queue || (!can_collapse_all_rows && blocks_written == 0
&& merged_rows == 0 && queue.empty() && current_keys.size() == 1))
current_keys.pushBack(next_key);
else
{
current_keys.popBack();
current_keys.pushGap(2);
}
}
else
rows_to_merge = current_keys.size();
}
if (current_keys.size() > max_rows_in_queue)
rows_to_merge = std::max(rows_to_merge, current_keys.size() - max_rows_in_queue);
while (rows_to_merge)
{
const auto & row = current_keys.front();
auto gap = current_keys.frontGap();
insertRow(gap, row, merged_columns);
current_keys.popFront();
++merged_rows;
--rows_to_merge;
if (merged_rows >= max_block_size)
{
++blocks_written;
return;
}
}
}
while (!current_keys.empty())
{
const auto & row = current_keys.front();
auto gap = current_keys.frontGap();
insertRow(gap, row, merged_columns);
current_keys.popFront();
++merged_rows;
}
/// Write information about last collapsed rows.
insertGap(current_keys.frontGap());
finished = true;
}
}

View File

@ -0,0 +1,222 @@
#pragma once
#include <common/logger_useful.h>
#include <DataStreams/MergingSortedBlockInputStream.h>
#include <deque>
namespace DB
{
namespace ErrorCodes
{
extern const int LOGICAL_ERROR;
}
static const size_t MAX_ROWS_IN_MULTIVERSION_QUEUE = 8192;
/* Deque with fixed memory size. Allows pushing gaps.
* frontGap() returns the number of gaps were inserted before front.
*
* This structure may be implemented via std::deque, but
* - Deque uses fixed amount of memory which is allocated in constructor. No more allocations are performed.
* - Gaps are not stored as separate values in queue, which is more memory efficient.
* - Deque is responsible for gaps invariant: after removing element, moves gaps into neighbor cell.
*
* Note: empty deque may have non-zero front gap.
*/
template <typename T>
class FixedSizeDequeWithGaps
{
public:
struct ValueWithGap
{
/// The number of gaps before current element. The number of gaps after last element stores into end cell.
size_t gap;
/// Store char[] instead of T in order to make ValueWithGap POD.
/// Call placement constructors after push and and destructors after pop.
char value[sizeof(T)];
};
explicit FixedSizeDequeWithGaps(size_t size)
{
container.resize_fill(size + 1);
}
~FixedSizeDequeWithGaps()
{
auto destruct_range = [this](size_t from, size_t to)
{
for (size_t i = from; i < to; ++i)
destructValue(i);
};
if (begin <= end)
destruct_range(begin, end);
else
{
destruct_range(0, end);
destruct_range(begin, container.size());
}
}
void pushBack(const T & value)
{
checkEnoughSpaceToInsert();
constructValue(end, value);
moveRight(end);
container[end].gap = 0;
}
void pushGap(size_t count) { container[end].gap += count; }
void popBack()
{
checkHasValuesToRemove();
size_t curr_gap = container[end].gap;
moveLeft(end);
destructValue(end);
container[end].gap += curr_gap;
}
void popFront()
{
checkHasValuesToRemove();
destructValue(begin);
moveRight(begin);
}
T & front()
{
checkHasValuesToGet();
return getValue(begin);
}
const T & front() const
{
checkHasValuesToGet();
return getValue(begin);
}
const T & back() const
{
size_t ps = end;
moveLeft(ps);
return getValue(ps);
}
size_t & frontGap() { return container[begin].gap; }
const size_t & frontGap() const { return container[begin].gap; }
size_t size() const
{
if (begin <= end)
return end - begin;
return end + (container.size() - begin);
}
bool empty() const { return begin == end; }
private:
PODArray<ValueWithGap> container;
size_t gap_before_first = 0;
size_t begin = 0;
size_t end = 0;
void constructValue(size_t index, const T & value) { new (container[index].value) T(value); }
void destructValue(size_t index) { reinterpret_cast<T *>(container[index].value)->~T(); }
T & getValue(size_t index) { return *reinterpret_cast<T *>(container[index].value); }
const T & getValue(size_t index) const { return *reinterpret_cast<const T *>(container[index].value); }
void moveRight(size_t & index) const
{
++index;
if (index == container.size())
index = 0;
}
void moveLeft(size_t & index) const
{
if (index == 0)
index = container.size();
--index;
}
void checkEnoughSpaceToInsert() const
{
if (size() + 1 == container.size())
throw Exception("Not enough space to insert into FixedSizeDequeWithGaps with capacity "
+ toString(container.size() - 1), ErrorCodes::LOGICAL_ERROR);
}
void checkHasValuesToRemove() const
{
if (empty())
throw Exception("Cannot remove from empty FixedSizeDequeWithGaps", ErrorCodes::LOGICAL_ERROR);
}
void checkHasValuesToGet() const
{
if (empty())
throw Exception("Cannot get value from empty FixedSizeDequeWithGaps", ErrorCodes::LOGICAL_ERROR);
}
};
class VersionedCollapsingSortedBlockInputStream : public MergingSortedBlockInputStream
{
public:
/// Don't need version column. It's in primary key.
/// max_rows_in_queue should be about max_block_size_ if we won't store a lot of extra blocks (RowRef holds SharedBlockPtr).
VersionedCollapsingSortedBlockInputStream(
BlockInputStreams inputs_, const SortDescription & description_,
const String & sign_column_, size_t max_block_size_, bool can_collapse_all_rows_,
WriteBuffer * out_row_sources_buf_ = nullptr)
: MergingSortedBlockInputStream(inputs_, description_, max_block_size_, 0, out_row_sources_buf_)
, sign_column(sign_column_)
, max_rows_in_queue(std::min(std::max<size_t>(3, max_block_size_), MAX_ROWS_IN_MULTIVERSION_QUEUE) - 2)
, current_keys(max_rows_in_queue + 1), can_collapse_all_rows(can_collapse_all_rows_)
{
}
String getName() const override { return "VersionedCollapsingSorted"; }
protected:
/// Can return 1 more records than max_block_size.
Block readImpl() override;
private:
String sign_column;
size_t sign_column_number = 0;
Logger * log = &Logger::get("VersionedCollapsingSortedBlockInputStream");
/// Read is finished.
bool finished = false;
Int8 sign_in_queue = 0;
const size_t max_rows_in_queue;
/// Rows with the same primary key and sign.
FixedSizeDequeWithGaps<RowRef> current_keys;
size_t blocks_written = 0;
/// Sources of rows for VERTICAL merge algorithm. Size equals to (size + number of gaps) in current_keys.
std::queue<RowSourcePart> current_row_sources;
const bool can_collapse_all_rows;
void merge(MutableColumns & merged_columns, std::priority_queue<SortCursor> & queue);
/// Output to result row for the current primary key.
void insertRow(size_t skip_rows, const RowRef & row, MutableColumns & merged_columns);
void insertGap(size_t gap_size);
};
}

View File

@ -112,7 +112,7 @@ public:
bool equals(const IDataType & rhs) const override; bool equals(const IDataType & rhs) const override;
bool textCanContainOnlyValidUTF8() const override; bool textCanContainOnlyValidUTF8() const override;
size_t getSizeOfValueInMemory() const override { return sizeof(Field); } size_t getSizeOfValueInMemory() const override { return sizeof(FieldType); }
}; };

View File

@ -73,7 +73,7 @@ BlockInputStreamPtr ClickHouseDictionarySource::loadAll()
*/ */
if (is_local) if (is_local)
return executeQuery(load_all_query, context, true).in; return executeQuery(load_all_query, context, true).in;
return std::make_shared<RemoteBlockInputStream>(pool, load_all_query, context); return std::make_shared<RemoteBlockInputStream>(pool, load_all_query, sample_block, context);
} }
@ -103,7 +103,7 @@ BlockInputStreamPtr ClickHouseDictionarySource::createStreamForSelectiveLoad(con
{ {
if (is_local) if (is_local)
return executeQuery(query, context, true).in; return executeQuery(query, context, true).in;
return std::make_shared<RemoteBlockInputStream>(pool, query, context); return std::make_shared<RemoteBlockInputStream>(pool, query, sample_block, context);
} }
} }

View File

@ -8,13 +8,6 @@ DictionaryBlockInputStreamBase::DictionaryBlockInputStreamBase(size_t rows_count
{ {
} }
String DictionaryBlockInputStreamBase::getID() const
{
std::stringstream ss;
ss << static_cast<const void*>(this);
return ss.str();
}
Block DictionaryBlockInputStreamBase::readImpl() Block DictionaryBlockInputStreamBase::readImpl()
{ {
if (next_row == rows_count) if (next_row == rows_count)
@ -26,4 +19,9 @@ Block DictionaryBlockInputStreamBase::readImpl()
return block; return block;
} }
Block DictionaryBlockInputStreamBase::getHeader() const
{
return getBlock(0, 0);
}
} }

View File

@ -11,17 +11,16 @@ protected:
DictionaryBlockInputStreamBase(size_t rows_count, size_t max_block_size); DictionaryBlockInputStreamBase(size_t rows_count, size_t max_block_size);
String getID() const override;
virtual Block getBlock(size_t start, size_t length) const = 0; virtual Block getBlock(size_t start, size_t length) const = 0;
Block getHeader() const override;
private: private:
const size_t rows_count; const size_t rows_count;
const size_t max_block_size; const size_t max_block_size;
size_t next_row; size_t next_row = 0;
Block readImpl() override; Block readImpl() override;
void readPrefixImpl() override { next_row = 0; }
}; };
} }

View File

@ -101,6 +101,8 @@ public:
} }
} }
Block getHeader() const override { return stream->getHeader(); };
private: private:
Block readImpl() override { return stream->read(); } Block readImpl() override { return stream->read(); }
@ -118,7 +120,6 @@ private:
} }
String getName() const override { return "WithBackgroundThread"; } String getName() const override { return "WithBackgroundThread"; }
String getID() const override { return "WithBackgroundThread(" + stream->getID() + ")"; }
BlockInputStreamPtr stream; BlockInputStreamPtr stream;
std::unique_ptr<ShellCommand> command; std::unique_ptr<ShellCommand> command;

View File

@ -7,7 +7,6 @@
#include <ext/bit_cast.h> #include <ext/bit_cast.h>
#include <ext/range.h> #include <ext/range.h>
namespace DB namespace DB
{ {
namespace ErrorCodes namespace ErrorCodes
@ -15,6 +14,7 @@ namespace ErrorCodes
extern const int NOT_IMPLEMENTED; extern const int NOT_IMPLEMENTED;
extern const int SIZES_OF_COLUMNS_DOESNT_MATCH; extern const int SIZES_OF_COLUMNS_DOESNT_MATCH;
extern const int FILE_DOESNT_EXIST; extern const int FILE_DOESNT_EXIST;
extern const int EXTERNAL_LIBRARY_ERROR;
} }
@ -41,18 +41,16 @@ public:
private: private:
std::unique_ptr<ClickHouseLibrary::CString[]> ptr_holder = nullptr; std::unique_ptr<ClickHouseLibrary::CString[]> ptr_holder = nullptr;
Container strings_holder; Container strings_holder;
}; };
namespace namespace
{ {
const std::string lib_config_settings = ".settings";
const std::string lib_config_settings = ".settings";
CStringsHolder getLibSettings(const Poco::Util::AbstractConfiguration & config, const std::string & config_root) CStringsHolder getLibSettings(const Poco::Util::AbstractConfiguration & config, const std::string & config_root)
{ {
Poco::Util::AbstractConfiguration::Keys config_keys; Poco::Util::AbstractConfiguration::Keys config_keys;
config.keys(config_root, config_keys); config.keys(config_root, config_keys);
CStringsHolder::Container strings; CStringsHolder::Container strings;
@ -66,15 +64,19 @@ CStringsHolder getLibSettings(const Poco::Util::AbstractConfiguration & config,
strings.emplace_back(config.getString(config_root + '.' + key)); strings.emplace_back(config.getString(config_root + '.' + key));
} }
return CStringsHolder(strings); return CStringsHolder(strings);
} }
Block dataToBlock(const Block & sample_block, const void * data) Block dataToBlock(const Block & sample_block, const void * data)
{ {
if (!data) if (!data)
return sample_block.cloneEmpty(); return sample_block.cloneEmpty();
auto columns_received = static_cast<const ClickHouseLibrary::ColumnsUInt64 *>(data); auto columns_received = static_cast<const ClickHouseLibrary::Table *>(data);
if (columns_received->error_code)
throw Exception("Received error: " + std::to_string(columns_received->error_code) + " "
+ (columns_received->error_string ? columns_received->error_string : ""),
ErrorCodes::EXTERNAL_LIBRARY_ERROR);
MutableColumns columns(sample_block.columns()); MutableColumns columns(sample_block.columns());
for (const auto i : ext::range(0, columns.size())) for (const auto i : ext::range(0, columns.size()))
@ -83,17 +85,23 @@ Block dataToBlock(const Block & sample_block, const void * data)
for (size_t col_n = 0; col_n < columns_received->size; ++col_n) for (size_t col_n = 0; col_n < columns_received->size; ++col_n)
{ {
if (columns.size() != columns_received->data[col_n].size) if (columns.size() != columns_received->data[col_n].size)
throw Exception("Received unexpected number of columns: " + std::to_string(columns_received->data[col_n].size) + ", must be" throw Exception("Received unexpected number of columns: " + std::to_string(columns_received->data[col_n].size)
+ std::to_string(columns.size()), + ", must be " + std::to_string(columns.size()),
ErrorCodes::SIZES_OF_COLUMNS_DOESNT_MATCH); ErrorCodes::SIZES_OF_COLUMNS_DOESNT_MATCH);
for (size_t row_n = 0; row_n < columns_received->data[col_n].size; ++row_n) for (size_t row_n = 0; row_n < columns_received->data[col_n].size; ++row_n)
columns[row_n]->insert(static_cast<UInt64>(columns_received->data[col_n].data[row_n])); {
const auto & field = columns_received->data[col_n].data[row_n];
if (!field.data)
continue;
const auto & size = field.size;
const auto & data = static_cast<const char *>(field.data);
columns[row_n]->insertData(data, size);
}
} }
return sample_block.cloneWithColumns(std::move(columns)); return sample_block.cloneWithColumns(std::move(columns));
} }
} }
@ -102,12 +110,12 @@ LibraryDictionarySource::LibraryDictionarySource(const DictionaryStructure & dic
const std::string & config_prefix, const std::string & config_prefix,
Block & sample_block, Block & sample_block,
const Context & context) const Context & context)
: log(&Logger::get("LibraryDictionarySource")), : log(&Logger::get("LibraryDictionarySource"))
dict_struct{dict_struct_}, , dict_struct{dict_struct_}
config_prefix{config_prefix}, , config_prefix{config_prefix}
path{config.getString(config_prefix + ".path", "")}, , path{config.getString(config_prefix + ".path", "")}
sample_block{sample_block}, , sample_block{sample_block}
context(context) , context(context)
{ {
if (!Poco::File(path).exists()) if (!Poco::File(path).exists())
{ {
@ -120,15 +128,15 @@ LibraryDictionarySource::LibraryDictionarySource(const DictionaryStructure & dic
} }
LibraryDictionarySource::LibraryDictionarySource(const LibraryDictionarySource & other) LibraryDictionarySource::LibraryDictionarySource(const LibraryDictionarySource & other)
: log(&Logger::get("LibraryDictionarySource")), : log(&Logger::get("LibraryDictionarySource"))
dict_struct{other.dict_struct}, , dict_struct{other.dict_struct}
config_prefix{other.config_prefix}, , config_prefix{other.config_prefix}
path{other.path}, , path{other.path}
sample_block{other.sample_block}, , sample_block{other.sample_block}
context(other.context), , context(other.context)
library{other.library}, , library{other.library}
description{other.description}, , description{other.description}
settings{other.settings} , settings{other.settings}
{ {
} }
@ -149,11 +157,11 @@ BlockInputStreamPtr LibraryDictionarySource::loadAll()
/// Get function pointer before dataAllocate call because library->get may throw. /// Get function pointer before dataAllocate call because library->get may throw.
auto fptr auto fptr
= library->get<void * (*)(decltype(data_ptr), decltype(&settings->strings), decltype(&columns))>("ClickHouseDictionary_v1_loadAll"); = library->get<void * (*)(decltype(data_ptr), decltype(&settings->strings), decltype(&columns))>("ClickHouseDictionary_v2_loadAll");
data_ptr = library->get<void * (*)()>("ClickHouseDictionary_v1_dataAllocate")(); data_ptr = library->get<void * (*)()>("ClickHouseDictionary_v2_dataAllocate")();
auto data = fptr(data_ptr, &settings->strings, &columns); auto data = fptr(data_ptr, &settings->strings, &columns);
auto block = dataToBlock(description.sample_block, data); auto block = dataToBlock(description.sample_block, data);
library->get<void (*)(void *)>("ClickHouseDictionary_v1_dataDelete")(data_ptr); library->get<void (*)(void *)>("ClickHouseDictionary_v2_dataDelete")(data_ptr);
return std::make_shared<OneBlockInputStream>(block); return std::make_shared<OneBlockInputStream>(block);
} }
@ -175,11 +183,11 @@ BlockInputStreamPtr LibraryDictionarySource::loadIds(const std::vector<UInt64> &
/// Get function pointer before dataAllocate call because library->get may throw. /// Get function pointer before dataAllocate call because library->get may throw.
auto fptr = library->get<void * (*)(decltype(data_ptr), decltype(&settings->strings), decltype(&columns_pass), decltype(&ids_data))>( auto fptr = library->get<void * (*)(decltype(data_ptr), decltype(&settings->strings), decltype(&columns_pass), decltype(&ids_data))>(
"ClickHouseDictionary_v1_loadIds"); "ClickHouseDictionary_v2_loadIds");
data_ptr = library->get<void * (*)()>("ClickHouseDictionary_v1_dataAllocate")(); data_ptr = library->get<void * (*)()>("ClickHouseDictionary_v2_dataAllocate")();
auto data = fptr(data_ptr, &settings->strings, &columns_pass, &ids_data); auto data = fptr(data_ptr, &settings->strings, &columns_pass, &ids_data);
auto block = dataToBlock(description.sample_block, data); auto block = dataToBlock(description.sample_block, data);
library->get<void (*)(void * data_ptr)>("ClickHouseDictionary_v1_dataDelete")(data_ptr); library->get<void (*)(void * data_ptr)>("ClickHouseDictionary_v2_dataDelete")(data_ptr);
return std::make_shared<OneBlockInputStream>(block); return std::make_shared<OneBlockInputStream>(block);
} }
@ -187,16 +195,6 @@ BlockInputStreamPtr LibraryDictionarySource::loadKeys(const Columns & key_column
{ {
LOG_TRACE(log, "loadKeys " << toString() << " size = " << requested_rows.size()); LOG_TRACE(log, "loadKeys " << toString() << " size = " << requested_rows.size());
/*
auto columns_c = std::make_unique<ClickHouseLibrary::Columns>(key_columns.size() + 1);
size_t i = 0;
for (auto & column : key_columns)
{
columns_c[i] = column->getName().c_str();
++i;
}
columns_c[i] = nullptr;
*/
auto columns_holder = std::make_unique<ClickHouseLibrary::CString[]>(key_columns.size()); auto columns_holder = std::make_unique<ClickHouseLibrary::CString[]>(key_columns.size());
ClickHouseLibrary::CStrings columns_pass{ ClickHouseLibrary::CStrings columns_pass{
static_cast<decltype(ClickHouseLibrary::CStrings::data)>(columns_holder.get()), key_columns.size()}; static_cast<decltype(ClickHouseLibrary::CStrings::data)>(columns_holder.get()), key_columns.size()};
@ -206,23 +204,24 @@ BlockInputStreamPtr LibraryDictionarySource::loadKeys(const Columns & key_column
columns_pass.data[key_columns_n] = column->getName().c_str(); columns_pass.data[key_columns_n] = column->getName().c_str();
++key_columns_n; ++key_columns_n;
} }
const ClickHouseLibrary::VectorUInt64 requested_rows_c{ext::bit_cast<decltype(ClickHouseLibrary::VectorUInt64::data)>(requested_rows.data()), requested_rows.size()}; const ClickHouseLibrary::VectorUInt64 requested_rows_c{
ext::bit_cast<decltype(ClickHouseLibrary::VectorUInt64::data)>(requested_rows.data()), requested_rows.size()};
void * data_ptr = nullptr; void * data_ptr = nullptr;
/// Get function pointer before dataAllocate call because library->get may throw. /// Get function pointer before dataAllocate call because library->get may throw.
auto fptr auto fptr
= library->get<void * (*)(decltype(data_ptr), decltype(&settings->strings), decltype(&columns_pass), decltype(&requested_rows_c))>( = library->get<void * (*)(decltype(data_ptr), decltype(&settings->strings), decltype(&columns_pass), decltype(&requested_rows_c))>(
"ClickHouseDictionary_v1_loadKeys"); "ClickHouseDictionary_v2_loadKeys");
data_ptr = library->get<void * (*)()>("ClickHouseDictionary_v1_dataAllocate")(); data_ptr = library->get<void * (*)()>("ClickHouseDictionary_v2_dataAllocate")();
auto data = fptr(data_ptr, &settings->strings, &columns_pass, &requested_rows_c); auto data = fptr(data_ptr, &settings->strings, &columns_pass, &requested_rows_c);
auto block = dataToBlock(description.sample_block, data); auto block = dataToBlock(description.sample_block, data);
library->get<void (*)(void * data_ptr)>("ClickHouseDictionary_v1_dataDelete")(data_ptr); library->get<void (*)(void * data_ptr)>("ClickHouseDictionary_v2_dataDelete")(data_ptr);
return std::make_shared<OneBlockInputStream>(block); return std::make_shared<OneBlockInputStream>(block);
} }
bool LibraryDictionarySource::isModified() const bool LibraryDictionarySource::isModified() const
{ {
auto fptr = library->tryGet<void * (*)(decltype(&settings->strings))>("ClickHouseDictionary_v1_isModified"); auto fptr = library->tryGet<void * (*)(decltype(&settings->strings))>("ClickHouseDictionary_v2_isModified");
if (fptr) if (fptr)
return fptr(&settings->strings); return fptr(&settings->strings);
return true; return true;
@ -230,7 +229,7 @@ bool LibraryDictionarySource::isModified() const
bool LibraryDictionarySource::supportsSelectiveLoad() const bool LibraryDictionarySource::supportsSelectiveLoad() const
{ {
auto fptr = library->tryGet<void * (*)(decltype(&settings->strings))>("ClickHouseDictionary_v1_supportsSelectiveLoad"); auto fptr = library->tryGet<void * (*)(decltype(&settings->strings))>("ClickHouseDictionary_v2_supportsSelectiveLoad");
if (fptr) if (fptr)
return fptr(&settings->strings); return fptr(&settings->strings);
return true; return true;

View File

@ -1,26 +1,25 @@
#pragma once #pragma once
#include <Common/SharedLibrary.h>
#include <Dictionaries/DictionaryStructure.h> #include <Dictionaries/DictionaryStructure.h>
#include <Dictionaries/ExternalResultDescription.h> #include <Dictionaries/ExternalResultDescription.h>
#include <Dictionaries/IDictionarySource.h> #include <Dictionaries/IDictionarySource.h>
#include <Common/SharedLibrary.h>
#include <common/LocalDateTime.h> #include <common/LocalDateTime.h>
namespace Poco namespace Poco
{ {
class Logger; class Logger;
namespace Util namespace Util
{ {
class AbstractConfiguration; class AbstractConfiguration;
} }
} }
namespace DB namespace DB
{ {
class CStringsHolder; class CStringsHolder;
/// Allows loading dictionaries from dynamic libraries (.so) /// Allows loading dictionaries from dynamic libraries (.so)
@ -65,5 +64,4 @@ private:
ExternalResultDescription description; ExternalResultDescription description;
std::shared_ptr<CStringsHolder> settings; std::shared_ptr<CStringsHolder> settings;
}; };
} }

Some files were not shown because too many files have changed in this diff Show More