merge master

This commit is contained in:
Ivan Blinkov 2019-01-28 11:19:29 +03:00
commit 1b5308092b
316 changed files with 6123 additions and 2692 deletions

View File

@ -1,3 +1,140 @@
## ClickHouse release 19.1.6, 2019-01-24
### Backward Incompatible Change
* Removed `ALTER MODIFY PRIMARY KEY` command because it was superseded by the `ALTER MODIFY ORDER BY` command. [#3887](https://github.com/yandex/ClickHouse/pull/3887) ([ztlpn](https://github.com/ztlpn))
### New Features
* Add ability to choose per column codecs for storage log and tiny log. [#4111](https://github.com/yandex/ClickHouse/pull/4111) ([alesapin](https://github.com/alesapin))
* Added functions `filesystemAvailable`, `filesystemFree`, `filesystemCapacity`. [#4097](https://github.com/yandex/ClickHouse/pull/4097) ([bgranvea](https://github.com/bgranvea))
* Add custom compression codecs. [#3899](https://github.com/yandex/ClickHouse/pull/3899) ([alesapin](https://github.com/alesapin))
* Added hashing functions `xxHash64` and `xxHash32`. [#3905](https://github.com/yandex/ClickHouse/pull/3905) ([filimonov](https://github.com/filimonov))
* Added multiple joins emulation (very experimental). [#3946](https://github.com/yandex/ClickHouse/pull/3946) ([4ertus2](https://github.com/4ertus2))
* Added support for CatBoost multiclass models evaluation. Function `modelEvaluate` returns tuple with per-class raw predictions for multiclass models. `libcatboostmodel.so` should be built with [#607](https://github.com/catboost/catboost/pull/607). [#3959](https://github.com/yandex/ClickHouse/pull/3959) ([KochetovNicolai](https://github.com/KochetovNicolai))
* Added gccHash function which uses the same hash seed as [gcc](https://github.com/gcc-mirror/gcc/blob/41d6b10e96a1de98e90a7c0378437c3255814b16/libstdc%2B%2B-v3/include/bits/functional_hash.h#L191) [#4000](https://github.com/yandex/ClickHouse/pull/4000) ([sundy-li](https://github.com/sundy-li))
* Added compression codec delta. [#4052](https://github.com/yandex/ClickHouse/pull/4052) ([alesapin](https://github.com/alesapin))
* Added multi searcher to search from multiple constant strings from big haystack. Added functions (`multiPosition`, `multiSearch` ,`firstMatch`) * (` `, `UTF8`, `CaseInsensitive`, `CaseInsensitiveUTF8`) [#4053](https://github.com/yandex/ClickHouse/pull/4053) ([danlark1](https://github.com/danlark1))
* Added ability to alter compression codecs. [#4054](https://github.com/yandex/ClickHouse/pull/4054) ([alesapin](https://github.com/alesapin))
* Add ability to write data into HDFS and small refactoring. [#4084](https://github.com/yandex/ClickHouse/pull/4084) ([alesapin](https://github.com/alesapin))
* Removed some redundant objects from compiled expressions cache (optimization). [#4042](https://github.com/yandex/ClickHouse/pull/4042) ([alesapin](https://github.com/alesapin))
* Added functions `JavaHash`, `HiveHash`. [#3811](https://github.com/yandex/ClickHouse/pull/3811) ([shangshujie365](https://github.com/shangshujie365))
* Added functions `left`, `right`, `trim`, `ltrim`, `rtrim`, `timestampadd`, `timestampsub`. [#3826](https://github.com/yandex/ClickHouse/pull/3826) ([blinkov](https://github.com/blinkov))
* Added function `remoteSecure`. Function works as `remote`, but uses secure connection. [#4088](https://github.com/yandex/ClickHouse/pull/4088) ([proller](https://github.com/proller))
### Improvements
* Support for IF NOT EXISTS in ALTER TABLE ADD COLUMN statements, and for IF EXISTS in DROP/MODIFY/CLEAR/COMMENT COLUMN. [#3900](https://github.com/yandex/ClickHouse/pull/3900) ([bgranvea](https://github.com/bgranvea))
* Function `parseDateTimeBestEffort`: support for formats `DD.MM.YYYY`, `DD.MM.YY`, `DD-MM-YYYY`, `DD-Mon-YYYY`, `DD/Month/YYYY` and similar. [#3922](https://github.com/yandex/ClickHouse/pull/3922) ([alexey-milovidov](https://github.com/alexey-milovidov))
* Add a MergeTree setting `use_minimalistic_part_header_in_zookeeper`. If enabled, Replicated tables will store compact part metadata in a single part znode. This can dramatically reduce ZooKeeper snapshot size (especially if the tables have a lot of columns). Note that after enabling this setting you will not be able to downgrade to a version that doesn't support it. [#3960](https://github.com/yandex/ClickHouse/pull/3960) ([ztlpn](https://github.com/ztlpn))
* Add an DFA-based implementation for functions `sequenceMatch` and `sequenceCount` in case pattern doesn't contain time. [#\](https://github.com/yandex/ClickHouse/pull/4004) ([ercolanelli-leo](https://github.com/ercolanelli-leo))
* Changed the way CapnProtoInputStream creates actions in such a way that it now support structures that are jagged. [#4063](https://github.com/yandex/ClickHouse/pull/4063) ([Miniwoffer](https://github.com/Miniwoffer))
* Better way to collect columns, tables and joins from AST when checking required columns. [#3930](https://github.com/yandex/ClickHouse/pull/3930) ([4ertus2](https://github.com/4ertus2))
* Zero left padding PODArray so that -1 element is always valid and zeroed. It's used for branchless Offset access. [#3920](https://github.com/yandex/ClickHouse/pull/3920) ([amosbird](https://github.com/amosbird))
* Performance improvement for int serialization. [#3968](https://github.com/yandex/ClickHouse/pull/3968) ([amosbird](https://github.com/amosbird))
* Moved debian/ specific entries to debian/.gitignore [#4106](https://github.com/yandex/ClickHouse/pull/4106) ([gerasiov](https://github.com/gerasiov))
* Decreased the number of connections in case of large number of Distributed tables in a single server. [#3726](https://github.com/yandex/ClickHouse/pull/3726) ([zhang2014](https://github.com/zhang2014))
* Supported totals row for `WITH TOTALS` query for ODBC driver (ODBCDriver2 format). [#3836](https://github.com/yandex/ClickHouse/pull/3836) ([nightweb](https://github.com/nightweb))
* Better constant expression folding. Possibility to skip unused shards if SELECT query filters by sharding_key (setting `distributed_optimize_skip_select_on_unused_shards`). [#3851](https://github.com/yandex/ClickHouse/pull/3851) ([abyss7](https://github.com/abyss7))
* Do not log from odbc-bridge when there is no console. [#3857](https://github.com/yandex/ClickHouse/pull/3857) ([alesapin](https://github.com/alesapin))
* Forbid using aggregate functions inside scalar subqueries. [#3865](https://github.com/yandex/ClickHouse/pull/3865) ([abyss7](https://github.com/abyss7))
* Added ability to use Enums as integers inside if function. [#3875](https://github.com/yandex/ClickHouse/pull/3875) ([abyss7](https://github.com/abyss7))
* Added `low_cardinality_allow_in_native_format` setting. If disabled, do not use `LowCadrinality` type in native format. [#3879](https://github.com/yandex/ClickHouse/pull/3879) ([KochetovNicolai](https://github.com/KochetovNicolai))
* Removed duplicate code. [#3915](https://github.com/yandex/ClickHouse/pull/3915) ([sergey-v-galtsev](https://github.com/sergey-v-galtsev))
* Minor improvements in StorageKafka. [#3919](https://github.com/yandex/ClickHouse/pull/3919) ([alexey-milovidov](https://github.com/alexey-milovidov))
* Automatically disable logs in negative tests. [#3940](https://github.com/yandex/ClickHouse/pull/3940) ([4ertus2](https://github.com/4ertus2))
* Refactored SyntaxAnalyzer. [#4014](https://github.com/yandex/ClickHouse/pull/4014) ([4ertus2](https://github.com/4ertus2))
* Reverted jemalloc patch which lead to performance degradation. [#4018](https://github.com/yandex/ClickHouse/pull/4018) ([alexey-milovidov](https://github.com/alexey-milovidov))
* Refactored QueryNormalizer. Unified column sources for ASTIdentifier and ASTQualifiedAsterisk (were different), removed column duplicates for ASTQualifiedAsterisk sources, cleared asterisks replacement. [#4031](https://github.com/yandex/ClickHouse/pull/4031) ([4ertus2](https://github.com/4ertus2))
* Refactored code with ASTIdentifier. [#4056](https://github.com/yandex/ClickHouse/pull/4056) [#4077](https://github.com/yandex/ClickHouse/pull/4077) [#4087](https://github.com/yandex/ClickHouse/pull/4087) ([4ertus2](https://github.com/4ertus2))
* Improve error message in `clickhouse-test` script when no ClickHouse binary was found. [#4130](https://github.com/yandex/ClickHouse/pull/4130) ([Miniwoffer](https://github.com/Miniwoffer))
* Rewrited code to calculate integer conversion function monotonicity. [#3921](https://github.com/yandex/ClickHouse/pull/3921) ([alexey-milovidov](https://github.com/alexey-milovidov))
* Fixed typos in comments. [#4089](https://github.com/yandex/ClickHouse/pull/4089) ([kvinty](https://github.com/kvinty))
### Build/Testing/Packaging Improvements
* Added minimal support for powerpc build. [#4132](https://github.com/yandex/ClickHouse/pull/4132) ([danlark1](https://github.com/danlark1))
* Fixed error when the server cannot start with the `bash: /usr/bin/clickhouse-extract-from-config: Operation not permitted` message within Docker or systemd-nspawn. [#4136](https://github.com/yandex/ClickHouse/pull/4136) ([alexey-milovidov](https://github.com/alexey-milovidov))
* Updated `mariadb-client` library. Fixed one of issues found by UBSan. [#3924](https://github.com/yandex/ClickHouse/pull/3924) ([alexey-milovidov](https://github.com/alexey-milovidov))
* Some fixes for UBSan builds. [#3926](https://github.com/yandex/ClickHouse/pull/3926) [#3948](https://github.com/yandex/ClickHouse/pull/3948) ([alexey-milovidov](https://github.com/alexey-milovidov))
* Move docker images to 18.10 and add compatibility file for glibc >= 2.28 [#3965](https://github.com/yandex/ClickHouse/pull/3965) ([alesapin](https://github.com/alesapin))
* Add env variable if user don't want to chown directories in server docker image. [#3967](https://github.com/yandex/ClickHouse/pull/3967) ([alesapin](https://github.com/alesapin))
* Stateful functional tests are run on public available dataset. [#3969](https://github.com/yandex/ClickHouse/pull/3969) ([alexey-milovidov](https://github.com/alexey-milovidov))
* Enabled most of the warnings from `-Weverything` in clang. Enabled `-Wpedantic`. [#3986](https://github.com/yandex/ClickHouse/pull/3986) ([alexey-milovidov](https://github.com/alexey-milovidov))
* Link to libLLVM rather than to individual LLVM libs when USE_STATIC_LIBRARIES is off. [#3989](https://github.com/yandex/ClickHouse/pull/3989) ([orivej](https://github.com/orivej))
* Added a few more warnings that are available only in clang 8. [#3993](https://github.com/yandex/ClickHouse/pull/3993) ([alexey-milovidov](https://github.com/alexey-milovidov))
* Fixed bugs found by PVS-Studio. [#4013](https://github.com/yandex/ClickHouse/pull/4013) ([alexey-milovidov](https://github.com/alexey-milovidov))
* Added sanitizer variables for test images. [#4072](https://github.com/yandex/ClickHouse/pull/4072) ([alesapin](https://github.com/alesapin))
* clickhouse-server debian package will recommend `libcap2-bin` package to use `setcap` tool for setting capabilities. This is optional. [#4093](https://github.com/yandex/ClickHouse/pull/4093) ([alexey-milovidov](https://github.com/alexey-milovidov))
* Improved compilation time, fixed includes. [#3898](https://github.com/yandex/ClickHouse/pull/3898) ([proller](https://github.com/proller))
* Added performance tests for hash functions. [#3918](https://github.com/yandex/ClickHouse/pull/3918) ([filimonov](https://github.com/filimonov))
* Fixed cyclic library dependences. [#3958](https://github.com/yandex/ClickHouse/pull/3958) ([proller](https://github.com/proller))
* Improved compilation with low available memory. [#4030](https://github.com/yandex/ClickHouse/pull/4030) ([proller](https://github.com/proller))
### Bug Fixes
* Fix bug when in remote table function execution when wrong restrictions were used for in `getStructureOfRemoteTable`. [#4009](https://github.com/yandex/ClickHouse/pull/4009) ([alesapin](https://github.com/alesapin))
* Fix a leak of netlink sockets. They were placed in a pool where they were never deleted and new sockets were created at the start of a new thread when all current sockets were in use. [#4017](https://github.com/yandex/ClickHouse/pull/4017) ([ztlpn](https://github.com/ztlpn))
* Regression in master. Fix "Unknown identifier" error in case column names appear in lambdas. [#4115](https://github.com/yandex/ClickHouse/pull/4115) ([4ertus2](https://github.com/4ertus2))
* Fix bug with closing /proc/self/fd earlier than all fds were read from /proc. [#4120](https://github.com/yandex/ClickHouse/pull/4120) ([alesapin](https://github.com/alesapin))
* Fixed misspells in **comments** and **string literals** under `dbms`. [#4122](https://github.com/yandex/ClickHouse/pull/4122) ([maiha](https://github.com/maiha))
* Fixed String to UInt monotonic conversion in case of usage String in primary key. [#3870](https://github.com/yandex/ClickHouse/pull/3870) ([zhang2014](https://github.com/zhang2014))
* Add checking that 'SET send_logs_level = value' query accept appropriate value. [#3873](https://github.com/yandex/ClickHouse/pull/3873) ([s-mx](https://github.com/s-mx))
* Fixed a race condition when executing a distributed ALTER task. The race condition led to more than one replica trying to execute the task and all replicas except one failing with a ZooKeeper error. [#3904](https://github.com/yandex/ClickHouse/pull/3904) ([ztlpn](https://github.com/ztlpn))
* Fixed segfault in `arrayEnumerateUniq`, `arrayEnumerateDense` functions in case of some invalid arguments. [#3909](https://github.com/yandex/ClickHouse/pull/3909) ([alexey-milovidov](https://github.com/alexey-milovidov))
* Fix UB in StorageMerge. [#3910](https://github.com/yandex/ClickHouse/pull/3910) ([amosbird](https://github.com/amosbird))
* Fixed segfault in functions `addDays`, `subtractDays`. [#3913](https://github.com/yandex/ClickHouse/pull/3913) ([alexey-milovidov](https://github.com/alexey-milovidov))
* Fixed error: functions `round`, `floor`, `trunc`, `ceil` may return bogus result when executed on integer argument and large negative scale. [#3914](https://github.com/yandex/ClickHouse/pull/3914) ([alexey-milovidov](https://github.com/alexey-milovidov))
* Fixed a bug introduced by 'kill query sync' which leads to a core dump. [#3916](https://github.com/yandex/ClickHouse/pull/3916) ([fancyqlx](https://github.com/fancyqlx))
* Fix bug with long delay after empty replication queue. [#3928](https://github.com/yandex/ClickHouse/pull/3928) ([alesapin](https://github.com/alesapin))
* Don't do exponential backoff when there is nothing to do for task. [#3932](https://github.com/yandex/ClickHouse/pull/3932) ([alesapin](https://github.com/alesapin))
* Fix a bug that led to hangups in threads that perform ALTERs of Replicated tables and in the thread that updates configuration from ZooKeeper. #2947 #3891 [#3934](https://github.com/yandex/ClickHouse/pull/3934) ([ztlpn](https://github.com/ztlpn))
* Fixed error in internal implementation of `quantileTDigest` (found by Artem Vakhrushev). This error never happens in ClickHouse and was relevant only for those who use ClickHouse codebase as a library directly. [#3935](https://github.com/yandex/ClickHouse/pull/3935) ([alexey-milovidov](https://github.com/alexey-milovidov))
* Fix bug with wrong prefix for ipv4 subnet masks. [#3945](https://github.com/yandex/ClickHouse/pull/3945) ([alesapin](https://github.com/alesapin))
* Fix a bug when `from_zk` config elements weren't refreshed after a request to ZooKeeper timed out. #2947 [#3947](https://github.com/yandex/ClickHouse/pull/3947) ([ztlpn](https://github.com/ztlpn))
* Fixed dictionary copying at LowCardinality::cloneEmpty() method which lead to excessive memory usage in case of inserting into table with LowCardinality primary key. [#3955](https://github.com/yandex/ClickHouse/pull/3955) ([KochetovNicolai](https://github.com/KochetovNicolai))
* Fixed crash (`std::terminate`) in rare cases when a new thread cannot be created due to exhausted resources. [#3956](https://github.com/yandex/ClickHouse/pull/3956) ([alexey-milovidov](https://github.com/alexey-milovidov))
* Fix user and password forwarding for replicated tables queries. [#3957](https://github.com/yandex/ClickHouse/pull/3957) ([alesapin](https://github.com/alesapin))
* Fixed very rare race condition that can happen when listing tables in Dictionary database while reloading dictionaries. [#3970](https://github.com/yandex/ClickHouse/pull/3970) ([alexey-milovidov](https://github.com/alexey-milovidov))
* Fixed LowCardinality serialization for Native format in case of empty arrays. #3907 [#4011](https://github.com/yandex/ClickHouse/pull/4011) ([KochetovNicolai](https://github.com/KochetovNicolai))
* Fixed incorrect result while using distinct by single LowCardinality numeric column. #3895 [#4012](https://github.com/yandex/ClickHouse/pull/4012) ([KochetovNicolai](https://github.com/KochetovNicolai))
* Make compiled_expression_cache_size setting limited by default. [#4041](https://github.com/yandex/ClickHouse/pull/4041) ([alesapin](https://github.com/alesapin))
* Fix ubsan bug in compression codecs. [#4069](https://github.com/yandex/ClickHouse/pull/4069) ([alesapin](https://github.com/alesapin))
* Allow Kafka Engine to ignore some number of parsing errors per block. [#4094](https://github.com/yandex/ClickHouse/pull/4094) ([abyss7](https://github.com/abyss7))
* Fixed glibc compatibility issues. [#4100](https://github.com/yandex/ClickHouse/pull/4100) ([alexey-milovidov](https://github.com/alexey-milovidov))
* Fixed issues found by PVS-Studio. [#4103](https://github.com/yandex/ClickHouse/pull/4103) ([alexey-milovidov](https://github.com/alexey-milovidov))
* Fix a way how to collect array join columns. [#4121](https://github.com/yandex/ClickHouse/pull/4121) ([4ertus2](https://github.com/4ertus2))
* Fixed incorrect result when HAVING was used with ROLLUP or CUBE. [#3756](https://github.com/yandex/ClickHouse/issues/3756) [#3837](https://github.com/yandex/ClickHouse/pull/3837) ([reflection](https://github.com/reflection))
* Fixed specialized aggregation with LowCardinality key (in case when `compile` setting is enabled). [#3886](https://github.com/yandex/ClickHouse/pull/3886) ([KochetovNicolai](https://github.com/KochetovNicolai))
* Fixed data type check in type conversion functions. [#3896](https://github.com/yandex/ClickHouse/pull/3896) ([zhang2014](https://github.com/zhang2014))
* Fixed column aliases for query with `JOIN ON` syntax and distributed tables. [#3980](https://github.com/yandex/ClickHouse/pull/3980) ([zhang2014](https://github.com/zhang2014))
* Fixed issues detected by UBSan. [#3021](https://github.com/yandex/ClickHouse/pull/3021) ([alexey-milovidov](https://github.com/alexey-milovidov))
### Doc fixes
* Translated table engines related part to Chinese. [#3844](https://github.com/yandex/ClickHouse/pull/3844) ([lamber-ken](https://github.com/lamber-ken))
* Fixed `toStartOfFiveMinute` description. [#4096](https://github.com/yandex/ClickHouse/pull/4096) ([cheesedosa](https://github.com/cheesedosa))
* Added description for client `--secure` argument. [#3961](https://github.com/yandex/ClickHouse/pull/3961) ([vicdashkov](https://github.com/vicdashkov))
* Added descriptions for settings `merge_tree_uniform_read_distribution`, `merge_tree_min_rows_for_concurrent_read`, `merge_tree_min_rows_for_seek`, `merge_tree_coarse_index_granularity`, `merge_tree_max_rows_to_use_cache` [#4024](https://github.com/yandex/ClickHouse/pull/4024) ([BayoNet](https://github.com/BayoNet))
* Minor doc fixes. [#4098](https://github.com/yandex/ClickHouse/pull/4098) ([blinkov](https://github.com/blinkov))
* Updated example for zookeeper config setting. [#3883](https://github.com/yandex/ClickHouse/pull/3883) [#3894](https://github.com/yandex/ClickHouse/pull/3894) ([ogorbacheva](https://github.com/ogorbacheva))
* Updated info about escaping in formats Vertical, Pretty and VerticalRaw. [#4118](https://github.com/yandex/ClickHouse/pull/4118) ([ogorbacheva](https://github.com/ogorbacheva))
* Adding description of the functions for working with UUID. [#4059](https://github.com/yandex/ClickHouse/pull/4059) ([ogorbacheva](https://github.com/ogorbacheva))
* Add the description of the CHECK TABLE query. [#3881](https://github.com/yandex/ClickHouse/pull/3881) [#4043](https://github.com/yandex/ClickHouse/pull/4043) ([ogorbacheva](https://github.com/ogorbacheva))
* Add `zh/tests` doc translate to Chinese. [#4034](https://github.com/yandex/ClickHouse/pull/4034) ([sundy-li](https://github.com/sundy-li))
* Added documentation about functions `multiPosition`, `firstMatch`, `multiSearch`. [#4123](https://github.com/yandex/ClickHouse/pull/4123) ([danlark1](https://github.com/danlark1))
* Add puppet module to the list of the third party libraries. [#3862](https://github.com/yandex/ClickHouse/pull/3862) ([Felixoid](https://github.com/Felixoid))
* Fixed typo in the English version of Creating a Table example [#3872](https://github.com/yandex/ClickHouse/pull/3872) ([areldar](https://github.com/areldar))
* Mention about nagios plugin for ClickHouse [#3878](https://github.com/yandex/ClickHouse/pull/3878) ([lisuml](https://github.com/lisuml))
* Update of query language syntax description. [#4065](https://github.com/yandex/ClickHouse/pull/4065) ([BayoNet](https://github.com/BayoNet))
* Added documentation for per-column compression codecs. [#4073](https://github.com/yandex/ClickHouse/pull/4073) ([alex-krash](https://github.com/alex-krash))
* Updated articles about CollapsingMergeTree, GraphiteMergeTree, Replicated*MergeTree, `CREATE TABLE` query [#4085](https://github.com/yandex/ClickHouse/pull/4085) ([BayoNet](https://github.com/BayoNet))
* Other minor improvements. [#3897](https://github.com/yandex/ClickHouse/pull/3897) [#3923](https://github.com/yandex/ClickHouse/pull/3923) [#4066](https://github.com/yandex/ClickHouse/pull/4066) [#3860](https://github.com/yandex/ClickHouse/pull/3860) [#3906](https://github.com/yandex/ClickHouse/pull/3906) [#3936](https://github.com/yandex/ClickHouse/pull/3936) [#3975](https://github.com/yandex/ClickHouse/pull/3975) ([ogorbacheva](https://github.com/ogorbacheva)) ([ogorbacheva](https://github.com/ogorbacheva)) ([ogorbacheva](https://github.com/ogorbacheva)) ([blinkov](https://github.com/blinkov)) ([blinkov](https://github.com/blinkov)) ([sdk2](https://github.com/sdk2)) ([blinkov](https://github.com/blinkov))
### Other
* Updated librdkafka to v1.0.0-RC5. Used cppkafka instead of raw C interface. [#4025](https://github.com/yandex/ClickHouse/pull/4025) ([abyss7](https://github.com/abyss7))
* Fixed `hidden` on page title [#4033](https://github.com/yandex/ClickHouse/pull/4033) ([xboston](https://github.com/xboston))
* Updated year in copyright to 2019. [#4039](https://github.com/yandex/ClickHouse/pull/4039) ([xboston](https://github.com/xboston))
* Added check that server process is started from the data directory's owner. Do not start server from root. [#3785](https://github.com/yandex/ClickHouse/pull/3785) ([sergey-v-galtsev](https://github.com/sergey-v-galtsev))
* Removed function `shardByHash`. [#3833](https://github.com/yandex/ClickHouse/pull/3833) ([alexey-milovidov](https://github.com/alexey-milovidov))
* Fixed typo in ClusterCopier. [#3854](https://github.com/yandex/ClickHouse/pull/3854) ([dqminh](https://github.com/dqminh))
* Minor grammar fixes. [#3855](https://github.com/yandex/ClickHouse/pull/3855) ([intgr](https://github.com/intgr))
* Added test script to reproduce performance degradation in jemalloc. [#4036](https://github.com/yandex/ClickHouse/pull/4036) ([alexey-milovidov](https://github.com/alexey-milovidov))
## ClickHouse release 18.16.1, 2018-12-21
### Bug fixes:

View File

@ -81,7 +81,7 @@ option (ENABLE_TESTS "Enables tests" ON)
if (CMAKE_SYSTEM_PROCESSOR MATCHES "amd64|x86_64")
option (USE_INTERNAL_MEMCPY "Use internal implementation of 'memcpy' function instead of provided by libc. Only for x86_64." ON)
if (OS_LINUX AND NOT UNBUNDLED)
if (OS_LINUX AND NOT UNBUNDLED AND MAKE_STATIC_LIBRARIES)
option (GLIBC_COMPATIBILITY "Set to TRUE to enable compatibility with older glibc libraries. Only for x86_64, Linux. Implies USE_INTERNAL_MEMCPY." ON)
if (GLIBC_COMPATIBILITY)
message (STATUS "Some symbols from glibc will be replaced for compatibility")
@ -232,6 +232,7 @@ include (cmake/find_llvm.cmake)
include (cmake/find_cpuid.cmake)
include (cmake/find_libgsasl.cmake)
include (cmake/find_libxml2.cmake)
include (cmake/find_protobuf.cmake)
include (cmake/find_hdfs3.cmake)
include (cmake/find_consistent-hashing.cmake)
include (cmake/find_base64.cmake)

View File

@ -10,3 +10,7 @@ ClickHouse is an open-source column-oriented database management system that all
* [Blog](https://clickhouse.yandex/blog/en/) contains various ClickHouse-related articles, as well as announces and reports about events.
* [Contacts](https://clickhouse.yandex/#contacts) can help to get your questions answered if there are any.
* You can also [fill this form](https://forms.yandex.com/surveys/meet-yandex-clickhouse-team/) to meet Yandex ClickHouse team in person.
## Upcoming Events
* [C++ ClickHouse and CatBoost Sprints](https://events.yandex.ru/events/ClickHouse/2-feb-2019/) in Moscow on February 2.

View File

@ -25,9 +25,10 @@ Various possible options. We are not going to automate testing all of them.
#### CPU architectures:
- x86_64;
- AArch64.
- AArch64;
- PowerPC64LE.
x86_64 is the main CPU architecture. We also have minimal support for AArch64.
x86_64 is the main CPU architecture. We also have minimal support for AArch64 and PowerPC64LE.
#### Operating systems:
- Linux;

View File

@ -24,3 +24,10 @@ if (CMAKE_CXX_COMPILER_ID STREQUAL "GNU")
elseif (CMAKE_CXX_COMPILER_ID STREQUAL "Clang")
set (COMPILER_CLANG 1)
endif ()
if (CMAKE_SYSTEM_PROCESSOR MATCHES "^(ppc64le.*|PPC64LE.*)")
set (ARCH_PPC64LE 1)
if (COMPILER_CLANG OR (COMPILER_GCC AND CMAKE_CXX_COMPILER_VERSION VERSION_LESS 8))
message(FATAL_ERROR "Only gcc-8 is supported for powerpc architecture")
endif ()
endif ()

View File

@ -15,6 +15,11 @@ endif ()
if (NOT GTEST_INCLUDE_DIRS AND NOT MISSING_INTERNAL_GTEST_LIBRARY)
set (USE_INTERNAL_GTEST_LIBRARY 1)
set (GTEST_MAIN_LIBRARIES gtest_main)
set (GTEST_INCLUDE_DIRS ${ClickHouse_SOURCE_DIR}/contrib/googletest/googletest)
endif ()
message (STATUS "Using gtest: ${GTEST_INCLUDE_DIRS} : ${GTEST_MAIN_LIBRARIES}")
if(GTEST_INCLUDE_DIRS AND GTEST_MAIN_LIBRARIES)
set(USE_GTEST 1)
endif()
message (STATUS "Using gtest=${USE_GTEST}: ${GTEST_INCLUDE_DIRS} : ${GTEST_MAIN_LIBRARIES}")

View File

@ -1,18 +1,29 @@
option (USE_INTERNAL_PROTOBUF_LIBRARY "Set to FALSE to use system protobuf instead of bundled" ON)
option(USE_INTERNAL_PROTOBUF_LIBRARY "Set to FALSE to use system protobuf instead of bundled" ${NOT_UNBUNDLED})
if(NOT EXISTS "${ClickHouse_SOURCE_DIR}/contrib/protobuf/cmake/CMakeLists.txt")
if(USE_INTERNAL_PROTOBUF_LIBRARY)
message(WARNING "submodule contrib/protobuf is missing. to fix try run: \n git submodule update --init --recursive")
set(USE_INTERNAL_PROTOBUF_LIBRARY 0)
endif()
set(MISSING_INTERNAL_PROTOBUF_LIBRARY 1)
endif()
if(NOT USE_INTERNAL_PROTOBUF_LIBRARY)
find_package(Protobuf)
endif()
if (Protobuf_LIBRARY AND Protobuf_INCLUDE_DIR)
else ()
set(Protobuf_INCLUDE_DIR ${CMAKE_SOURCE_DIR}/contrib/protobuf/src)
set(USE_PROTOBUF 1)
elseif(NOT MISSING_INTERNAL_PROTOBUF_LIBRARY)
set(Protobuf_INCLUDE_DIR ${ClickHouse_SOURCE_DIR}/contrib/protobuf/src)
set(USE_PROTOBUF 1)
set(USE_INTERNAL_PROTOBUF_LIBRARY 1)
set(Protobuf_LIBRARY libprotobuf)
set(Protobuf_PROTOC_LIBRARY libprotoc)
set(Protobuf_LITE_LIBRARY libprotobuf-lite)
set(Protobuf_PROTOC_EXECUTABLE ${CMAKE_BINARY_DIR}/contrib/protobuf/cmake/protoc)
set(Protobuf_PROTOC_EXECUTABLE ${ClickHouse_BINARY_DIR}/contrib/protobuf/cmake/protoc)
if(NOT DEFINED PROTOBUF_GENERATE_CPP_APPEND_PATH)
set(PROTOBUF_GENERATE_CPP_APPEND_PATH TRUE)
@ -77,4 +88,4 @@ else ()
endfunction()
endif()
message (STATUS "Using protobuf: ${Protobuf_INCLUDE_DIR} : ${Protobuf_LIBRARY}")
message(STATUS "Using protobuf=${USE_PROTOBUF}: ${Protobuf_INCLUDE_DIR} : ${Protobuf_LIBRARY}")

View File

@ -2,6 +2,11 @@ if (NOT ARCH_ARM AND NOT ARCH_32 AND NOT APPLE)
option (ENABLE_RDKAFKA "Enable kafka" ON)
endif ()
if (NOT EXISTS "${ClickHouse_SOURCE_DIR}/contrib/cppkafka/CMakeLists.txt")
message (WARNING "submodule contrib/cppkafka is missing. to fix try run: \n git submodule update --init --recursive")
set (ENABLE_RDKAFKA 0)
endif ()
if (ENABLE_RDKAFKA)
if (OS_LINUX AND NOT ARCH_ARM)

View File

@ -14,6 +14,7 @@ if (ZSTD_LIBRARY AND ZSTD_INCLUDE_DIR)
else ()
set (USE_INTERNAL_ZSTD_LIBRARY 1)
set (ZSTD_LIBRARY zstd)
set (ZSTD_INCLUDE_DIR ${ClickHouse_SOURCE_DIR}/contrib/zstd/lib)
endif ()
message (STATUS "Using zstd: ${ZSTD_INCLUDE_DIR} : ${ZSTD_LIBRARY}")

View File

@ -27,6 +27,9 @@ if (HAVE_SSE41)
set (COMPILER_FLAGS "${COMPILER_FLAGS} ${TEST_FLAG}")
endif ()
if (ARCH_PPC64LE)
set (COMPILER_FLAGS "${COMPILER_FLAGS} -maltivec -D__SSE2__=1 -DNO_WARN_X86_INTRINSICS")
endif ()
# gcc -dM -E -msse4.2 - < /dev/null | sort > gcc-dump-sse42
#define __SSE4_2__ 1

View File

@ -207,14 +207,14 @@ if (USE_INTERNAL_LIBXML2_LIBRARY)
add_subdirectory(libxml2-cmake)
endif ()
if (USE_INTERNAL_HDFS3_LIBRARY)
include(${ClickHouse_SOURCE_DIR}/cmake/find_protobuf.cmake)
if (USE_INTERNAL_PROTOBUF_LIBRARY)
set(protobuf_BUILD_TESTS OFF CACHE INTERNAL "" FORCE)
set(protobuf_BUILD_SHARED_LIBS OFF CACHE INTERNAL "" FORCE)
set(protobuf_WITH_ZLIB 0 CACHE INTERNAL "" FORCE) # actually will use zlib, but skip find
add_subdirectory(protobuf/cmake)
endif ()
if (USE_INTERNAL_HDFS3_LIBRARY)
add_subdirectory(libhdfs3-cmake)
endif ()

View File

@ -341,6 +341,13 @@ static inline __m128i libdivide_get_0000FFFF(void) {
#pragma clang diagnostic pop
#endif
/// This is a bug in gcc-8, _MM_SHUFFLE was forgotten, though in trunk it is ok https://github.com/gcc-mirror/gcc/blob/master/gcc/config/rs6000/xmmintrin.h#L61
#if defined(__PPC__)
#ifndef _MM_SHUFFLE
#define _MM_SHUFFLE(w,x,y,z) (((w) << 6) | ((x) << 4) | ((y) << 2) | (z))
#endif
#endif
static inline __m128i libdivide_s64_signbits(__m128i v) {
//we want to compute v >> 63, that is, _mm_srai_epi64(v, 63). But there is no 64 bit shift right arithmetic instruction in SSE2. So we have to fake it by first duplicating the high 32 bit values, and then using a 32 bit shift. Another option would be to use _mm_srli_epi64(v, 63) and then subtract that from 0, but that approach appears to be substantially slower for unknown reasons
__m128i hiBitsDuped = _mm_shuffle_epi32(v, _MM_SHUFFLE(3, 3, 1, 1));

View File

@ -7,9 +7,9 @@ CHECK_FUNCTION_EXISTS(nanosleep HAVE_NANOSLEEP)
SET(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -fno-strict-aliasing")
SET(CMAKE_C_FLAGS "${CMAKE_C_FLAGS} -fno-strict-aliasing")
IF(ENABLE_SSE STREQUAL ON)
IF(ENABLE_SSE STREQUAL ON AND NOT ARCH_PPC64LE AND NOT ARCH_AARCH64 AND NOT ARCH_ARM)
SET(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -msse4.2")
ENDIF(ENABLE_SSE STREQUAL ON)
ENDIF(ENABLE_SSE STREQUAL ON AND NOT ARCH_PPC64LE AND NOT ARCH_AARCH64 AND NOT ARCH_ARM)
IF(NOT TEST_HDFS_PREFIX)
SET(TEST_HDFS_PREFIX "./" CACHE STRING "default directory prefix used for test." FORCE)

View File

@ -2,6 +2,7 @@ set(RDKAFKA_SOURCE_DIR ${CMAKE_SOURCE_DIR}/contrib/librdkafka/src)
set(SRCS
${RDKAFKA_SOURCE_DIR}/crc32c.c
${RDKAFKA_SOURCE_DIR}/rdkafka_zstd.c
${RDKAFKA_SOURCE_DIR}/rdaddr.c
${RDKAFKA_SOURCE_DIR}/rdavl.c
${RDKAFKA_SOURCE_DIR}/rdbuf.c
@ -59,5 +60,6 @@ set(SRCS
add_library(rdkafka ${LINK_MODE} ${SRCS})
target_include_directories(rdkafka SYSTEM PUBLIC include)
target_include_directories(rdkafka SYSTEM PUBLIC ${RDKAFKA_SOURCE_DIR})
target_link_libraries(rdkafka PUBLIC ${ZLIB_LIBRARIES} ${OPENSSL_SSL_LIBRARY} ${OPENSSL_CRYPTO_LIBRARY})
target_include_directories(rdkafka SYSTEM PUBLIC ${RDKAFKA_SOURCE_DIR}) # Because weird logic with "include_next" is used.
target_include_directories(rdkafka SYSTEM PRIVATE ${ZSTD_INCLUDE_DIR}/common) # Because wrong path to "zstd_errors.h" is used.
target_link_libraries(rdkafka PUBLIC ${ZLIB_LIBRARIES} ${ZSTD_LIBRARY} ${OPENSSL_SSL_LIBRARY} ${OPENSSL_CRYPTO_LIBRARY})

View File

@ -51,6 +51,8 @@
//#define WITH_PLUGINS 1
// zlib
#define WITH_ZLIB 1
// zstd
#define WITH_ZSTD 1
// WITH_SNAPPY
#define WITH_SNAPPY 1
// WITH_SOCKEM
@ -60,7 +62,9 @@
// WITH_SASL_SCRAM
//#define WITH_SASL_SCRAM 1
// crc32chw
#if !defined(__PPC__)
#define WITH_CRC32C_HW 1
#endif
// regex
#define HAVE_REGEX 1
// strndup

View File

@ -1,13 +1,11 @@
enable_language(ASM)
if (ARCH_PPC64LE)
add_library(unwind
src/mi/init.c
src/mi/flush_cache.c
src/mi/mempool.c
src/mi/strerror.c
src/x86_64/is_fpreg.c
src/x86_64/regname.c
src/mi/_ReadULEB.c
src/mi/_ReadSLEB.c
src/mi/backtrace.c
@ -26,6 +24,70 @@ src/mi/Lset_reg.c
src/mi/Lget_fpreg.c
src/mi/Lset_fpreg.c
src/mi/Lset_caching_policy.c
src/dwarf/Lexpr.c
src/dwarf/Lfde.c
src/dwarf/Lfind_proc_info-lsb.c
src/dwarf/Lparser.c
src/dwarf/Lpe.c
src/dwarf/global.c
src/elf64.c
src/os-linux.c
src/ppc64/is_fpreg.c
src/ppc64/regname.c
src/ppc64/get_func_addr.c
src/ppc/Linit_local.c
src/ppc/Linit_remote.c
src/ppc/Lis_signal_frame.c
src/ppc/longjmp.S
src/ppc/Lreg_states_iterate.c
src/ppc/siglongjmp.S
src/ppc64/setcontext.S
src/ppc64/Lcreate_addr_space.c
src/ppc64/Lglobal.c
src/ppc64/Linit.c
src/ppc64/Lreg_states_iterate.c
src/ppc64/Lregs.c
src/ppc64/Lresume.c
src/ppc64/Lstep.c
src/ppc64/regname.c
src/ppc64/setcontext.S
)
else ()
add_library(unwind
src/mi/init.c
src/mi/flush_cache.c
src/mi/mempool.c
src/mi/strerror.c
src/mi/_ReadULEB.c
src/mi/_ReadSLEB.c
src/mi/backtrace.c
src/mi/dyn-cancel.c
src/mi/dyn-info-list.c
src/mi/dyn-register.c
src/mi/Ldyn-extract.c
src/mi/Lfind_dynamic_proc_info.c
src/mi/Lget_accessors.c
src/mi/Lget_proc_info_by_ip.c
src/mi/Lget_proc_name.c
src/mi/Lput_dynamic_unwind_info.c
src/mi/Ldestroy_addr_space.c
src/mi/Lget_reg.c
src/mi/Lset_reg.c
src/mi/Lget_fpreg.c
src/mi/Lset_fpreg.c
src/mi/Lset_caching_policy.c
src/dwarf/Lexpr.c
src/dwarf/Lfde.c
src/dwarf/Lfind_proc_info-lsb.c
src/dwarf/Lparser.c
src/dwarf/Lpe.c
src/dwarf/global.c
src/elf64.c
src/os-linux.c
src/x86_64/is_fpreg.c
src/x86_64/regname.c
src/x86_64/setcontext.S
src/x86_64/Lcreate_addr_space.c
src/x86_64/Lget_save_loc.c
@ -40,17 +102,9 @@ src/x86_64/Lstash_frame.c
src/x86_64/Lstep.c
src/x86_64/Ltrace.c
src/x86_64/getcontext.S
src/dwarf/Lexpr.c
src/dwarf/Lfde.c
src/dwarf/Lfind_proc_info-lsb.c
src/dwarf/Lparser.c
src/dwarf/Lpe.c
src/dwarf/global.c
src/elf64.c
src/os-linux.c
src/x86_64/Los-linux.c
)
endif()
find_file (HAVE_ATOMIC_OPS_H "atomic_ops.h")
configure_file (config/config.h.in ${CMAKE_CURRENT_BINARY_DIR}/config/config.h)

View File

@ -37,6 +37,7 @@ WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. */
#include <stdlib.h>
#include <libunwind.h>
#include <libunwind-ppc64.h>
#include "elf64.h"
#include "mempool.h"

View File

@ -25,7 +25,7 @@ LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION
OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION
WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. */
#include "unwind_i.h"
#include "../ppc64/unwind_i.h"
PROTECTED int
unw_reg_states_iterate (unw_cursor_t *cursor,

View File

@ -298,6 +298,11 @@ target_link_libraries(dbms PRIVATE ${OPENSSL_CRYPTO_LIBRARY} Threads::Threads)
target_include_directories (dbms SYSTEM BEFORE PRIVATE ${DIVIDE_INCLUDE_DIR})
target_include_directories (dbms SYSTEM BEFORE PRIVATE ${SPARCEHASH_INCLUDE_DIR})
if (USE_PROTOBUF)
target_link_libraries (dbms PRIVATE ${Protobuf_LIBRARY})
target_include_directories (dbms SYSTEM BEFORE PRIVATE ${Protobuf_INCLUDE_DIR})
endif ()
if (USE_HDFS)
target_link_libraries (clickhouse_common_io PRIVATE ${HDFS3_LIBRARY})
target_include_directories (clickhouse_common_io SYSTEM BEFORE PRIVATE ${HDFS3_INCLUDE_DIR})
@ -318,7 +323,7 @@ target_include_directories (clickhouse_common_io BEFORE PRIVATE ${COMMON_INCLUDE
add_subdirectory (programs)
add_subdirectory (tests)
if (ENABLE_TESTS)
if (ENABLE_TESTS AND USE_GTEST)
macro (grep_gtest_sources BASE_DIR DST_VAR)
# Cold match files that are not in tests/ directories
file(GLOB_RECURSE "${DST_VAR}" RELATIVE "${BASE_DIR}" "gtest*.cpp")

View File

@ -2,10 +2,10 @@
set(VERSION_REVISION 54413)
set(VERSION_MAJOR 19)
set(VERSION_MINOR 1)
set(VERSION_PATCH 5)
set(VERSION_GITHASH 2a7e7364c139b3c97f54f38ca6ea76ab4fa61e4b)
set(VERSION_DESCRIBE v19.1.5-testing)
set(VERSION_STRING 19.1.5)
set(VERSION_PATCH 6)
set(VERSION_GITHASH f73b337a93d534671b2187660398b8573fc1d464)
set(VERSION_DESCRIBE v19.1.6-testing)
set(VERSION_STRING 19.1.6)
# end of autochange
set(VERSION_EXTRA "" CACHE STRING "")

View File

@ -56,6 +56,7 @@
#include <Parsers/formatAST.h>
#include <Parsers/parseQuery.h>
#include <Interpreters/Context.h>
#include <Interpreters/InterpreterSetQuery.h>
#include <Client/Connection.h>
#include <Common/InterruptListener.h>
#include <Functions/registerFunctions.h>
@ -219,6 +220,9 @@ private:
APPLY_FOR_SETTINGS(EXTRACT_SETTING)
#undef EXTRACT_SETTING
/// Set path for format schema files
if (config().has("format_schema_path"))
context.setFormatSchemaPath(Poco::Path(config().getString("format_schema_path")).toString());
}
@ -1206,6 +1210,10 @@ private:
const auto & id = typeid_cast<const ASTIdentifier &>(*query_with_output->format);
current_format = id.name;
}
if (query_with_output->settings_ast)
{
InterpreterSetQuery(query_with_output->settings_ast, context).executeForCurrentContext();
}
}
if (has_vertical_output_suffix)

View File

@ -46,6 +46,8 @@
namespace fs = boost::filesystem;
using String = std::string;
const String FOUR_SPACES = " ";
const std::regex QUOTE_REGEX{"\""};
const std::regex NEW_LINE{"\n"};
namespace DB
{
@ -80,7 +82,7 @@ public:
bool reserved = (value[0] == '[' || value[0] == '{' || value == "null");
if (!reserved && wrap)
value = '"' + value + '"';
value = '"' + std::regex_replace(value, NEW_LINE, "\\n") + '"';
content[key] = value;
}
@ -579,7 +581,8 @@ private:
using Paths = std::vector<String>;
using StringToVector = std::map<String, std::vector<String>>;
StringToVector substitutions;
using StringToMap = std::map<String, StringToVector>;
StringToMap substitutions;
using StringKeyValue = std::map<String, String>;
std::vector<StringKeyValue> substitutions_maps;
@ -933,13 +936,13 @@ private:
{
/// Make "subconfig" of inner xml block
ConfigurationPtr substitutions_view(test_config->createView("substitutions"));
constructSubstitutions(substitutions_view, substitutions);
constructSubstitutions(substitutions_view, substitutions[test_name]);
auto queries_pre_format = queries;
queries.clear();
for (const auto & query : queries_pre_format)
{
auto formatted = formatQueries(query, substitutions);
auto formatted = formatQueries(query, substitutions[test_name]);
queries.insert(queries.end(), formatted.begin(), formatted.end());
}
}
@ -994,6 +997,9 @@ private:
}
else
{
if (metrics.empty())
throw DB::Exception("You shoud specify at least one metric", DB::ErrorCodes::BAD_ARGUMENTS);
main_metric = metrics[0];
if (lite_output)
throw DB::Exception("Specify main_metric for lite output", DB::ErrorCodes::BAD_ARGUMENTS);
}
@ -1219,11 +1225,11 @@ public:
json_output.set("test_name", test_name);
json_output.set("main_metric", main_metric);
if (substitutions.size())
if (substitutions[test_name].size())
{
JSONString json_parameters(2); /// here, 2 is the size of \t padding
for (auto it = substitutions.begin(); it != substitutions.end(); ++it)
for (auto it = substitutions[test_name].begin(); it != substitutions[test_name].end(); ++it)
{
String parameter = it->first;
std::vector<String> values = it->second;
@ -1231,7 +1237,7 @@ public:
String array_string = "[";
for (size_t i = 0; i != values.size(); ++i)
{
array_string += '"' + values[i] + '"';
array_string += '"' + std::regex_replace(values[i], QUOTE_REGEX, "\\\"") + '"';
if (i != values.size() - 1)
{
array_string += ", ";
@ -1257,7 +1263,7 @@ public:
JSONString runJSON;
runJSON.set("query", queries[query_index]);
runJSON.set("query", std::regex_replace(queries[query_index], QUOTE_REGEX, "\\\""));
if (!statistics.exception.empty())
runJSON.set("exception", statistics.exception);

View File

@ -23,7 +23,7 @@ if (CLICKHOUSE_SPLIT_BINARY)
install (TARGETS clickhouse-server ${CLICKHOUSE_ALL_TARGETS} RUNTIME DESTINATION ${CMAKE_INSTALL_BINDIR} COMPONENT clickhouse)
endif ()
if (OS_LINUX AND MAKE_STATIC_LIBRARIES)
if (GLIBC_COMPATIBILITY)
set (GLIBC_MAX_REQUIRED 2.4 CACHE INTERNAL "")
# temporary disabled. to enable - change 'exit 0' to 'exit $a'
add_test(NAME GLIBC_required_version COMMAND bash -c "readelf -s ${CMAKE_CURRENT_BINARY_DIR}/../clickhouse-server | perl -nE 'END {exit 0 if $a} ++$a, print if /\\x40GLIBC_(\\S+)/ and pack(q{C*}, split /\\./, \$1) gt pack q{C*}, split /\\./, q{${GLIBC_MAX_REQUIRED}}'")

View File

@ -31,7 +31,7 @@
#include <IO/MemoryReadWriteBuffer.h>
#include <IO/WriteBufferFromTemporaryFile.h>
#include <DataStreams/IProfilingBlockInputStream.h>
#include <DataStreams/IBlockInputStream.h>
#include <Interpreters/executeQuery.h>
#include <Interpreters/Quota.h>

View File

@ -418,7 +418,7 @@ int Server::main(const std::vector<std::string> & /*args*/)
/// Set path for format schema files
auto format_schema_path = Poco::File(config().getString("format_schema_path", path + "format_schemas/"));
global_context->setFormatSchemaPath(format_schema_path.path() + "/");
global_context->setFormatSchemaPath(format_schema_path.path());
format_schema_path.createDirectories();
LOG_INFO(log, "Loading metadata.");

View File

@ -484,21 +484,16 @@ void TCPHandler::processTablesStatusRequest()
void TCPHandler::sendProfileInfo()
{
if (const IProfilingBlockInputStream * input = dynamic_cast<const IProfilingBlockInputStream *>(state.io.in.get()))
{
writeVarUInt(Protocol::Server::ProfileInfo, *out);
input->getProfileInfo().write(*out);
state.io.in->getProfileInfo().write(*out);
out->next();
}
}
void TCPHandler::sendTotals()
{
if (IProfilingBlockInputStream * input = dynamic_cast<IProfilingBlockInputStream *>(state.io.in.get()))
{
const Block & totals = input->getTotals();
const Block & totals = state.io.in->getTotals();
if (totals)
{
@ -512,14 +507,11 @@ void TCPHandler::sendTotals()
out->next();
}
}
}
void TCPHandler::sendExtremes()
{
if (IProfilingBlockInputStream * input = dynamic_cast<IProfilingBlockInputStream *>(state.io.in.get()))
{
Block extremes = input->getExtremes();
Block extremes = state.io.in->getExtremes();
if (extremes)
{
@ -533,7 +525,6 @@ void TCPHandler::sendExtremes()
out->next();
}
}
}
void TCPHandler::receiveHello()

View File

@ -100,7 +100,7 @@ public:
return res;
}
void NO_SANITIZE_UNDEFINED add(AggregateDataPtr place, const IColumn ** columns, size_t row_num, Arena *) const override
void add(AggregateDataPtr place, const IColumn ** columns, size_t row_num, Arena *) const override
{
/// Out of range conversion may occur. This is Ok.
@ -177,8 +177,11 @@ public:
static void assertSecondArg(const DataTypes & argument_types)
{
if constexpr (has_second_arg)
/// TODO: check that second argument is of numerical type.
{
assertBinary(Name::name, argument_types);
if (!isUnsignedInteger(argument_types[1]))
throw Exception("Second argument (weight) for function " + std::string(Name::name) + " must be unsigned integer, but it has type " + argument_types[1]->getName(), ErrorCodes::ILLEGAL_TYPE_OF_ARGUMENT);
}
else
assertUnary(Name::name, argument_types);
}

View File

@ -12,10 +12,41 @@ namespace DB
namespace
{
AggregateFunctionPtr createAggregateFunctionSumMap(const std::string & name, const DataTypes & arguments, const Array & params)
struct WithOverflowPolicy
{
assertNoParameters(name, params);
/// Overflow, meaning that the returned type is the same as the input type.
static DataTypePtr promoteType(const DataTypePtr & data_type) { return data_type; }
};
struct WithoutOverflowPolicy
{
/// No overflow, meaning we promote the types if necessary.
static DataTypePtr promoteType(const DataTypePtr & data_type)
{
if (!data_type->canBePromoted())
throw new Exception{"Values to be summed are expected to be Numeric, Float or Decimal.",
ErrorCodes::ILLEGAL_TYPE_OF_ARGUMENT};
return data_type->promoteNumericType();
}
};
template <typename T>
using SumMapWithOverflow = AggregateFunctionSumMap<T, WithOverflowPolicy>;
template <typename T>
using SumMapWithoutOverflow = AggregateFunctionSumMap<T, WithoutOverflowPolicy>;
template <typename T>
using SumMapFilteredWithOverflow = AggregateFunctionSumMapFiltered<T, WithOverflowPolicy>;
template <typename T>
using SumMapFilteredWithoutOverflow = AggregateFunctionSumMapFiltered<T, WithoutOverflowPolicy>;
using SumMapArgs = std::pair<DataTypePtr, DataTypes>;
SumMapArgs parseArguments(const std::string & name, const DataTypes & arguments)
{
if (arguments.size() < 2)
throw Exception("Aggregate function " + name + " requires at least two arguments of Array type.",
ErrorCodes::NUMBER_OF_ARGUMENTS_DOESNT_MATCH);
@ -25,9 +56,11 @@ AggregateFunctionPtr createAggregateFunctionSumMap(const std::string & name, con
throw Exception("First argument for function " + name + " must be an array.",
ErrorCodes::ILLEGAL_TYPE_OF_ARGUMENT);
const DataTypePtr & keys_type = array_type->getNestedType();
DataTypePtr keys_type = array_type->getNestedType();
DataTypes values_types;
values_types.reserve(arguments.size() - 1);
for (size_t i = 1; i < arguments.size(); ++i)
{
array_type = checkAndGetDataType<DataTypeArray>(arguments[i].get());
@ -37,20 +70,55 @@ AggregateFunctionPtr createAggregateFunctionSumMap(const std::string & name, con
values_types.push_back(array_type->getNestedType());
}
AggregateFunctionPtr res(createWithNumericBasedType<AggregateFunctionSumMap>(*keys_type, keys_type, values_types));
return {std::move(keys_type), std::move(values_types)};
}
template <template <typename> class Function>
AggregateFunctionPtr createAggregateFunctionSumMap(const std::string & name, const DataTypes & arguments, const Array & params)
{
assertNoParameters(name, params);
auto [keys_type, values_types] = parseArguments(name, arguments);
AggregateFunctionPtr res(createWithNumericBasedType<Function>(*keys_type, keys_type, values_types));
if (!res)
res.reset(createWithDecimalType<AggregateFunctionSumMap>(*keys_type, keys_type, values_types));
res.reset(createWithDecimalType<Function>(*keys_type, keys_type, values_types));
if (!res)
throw Exception("Illegal type of argument for aggregate function " + name, ErrorCodes::ILLEGAL_TYPE_OF_ARGUMENT);
return res;
}
template <template <typename> class Function>
AggregateFunctionPtr createAggregateFunctionSumMapFiltered(const std::string & name, const DataTypes & arguments, const Array & params)
{
if (params.size() != 1)
throw Exception("Aggregate function " + name + " requires exactly one parameter of Array type.",
ErrorCodes::NUMBER_OF_ARGUMENTS_DOESNT_MATCH);
Array keys_to_keep;
if (!params.front().tryGet<Array>(keys_to_keep))
throw Exception("Aggregate function " + name + " requires an Array as parameter.",
ErrorCodes::ILLEGAL_TYPE_OF_ARGUMENT);
auto [keys_type, values_types] = parseArguments(name, arguments);
AggregateFunctionPtr res(createWithNumericBasedType<Function>(*keys_type, keys_type, values_types, keys_to_keep));
if (!res)
res.reset(createWithDecimalType<Function>(*keys_type, keys_type, values_types, keys_to_keep));
if (!res)
throw Exception("Illegal type of argument for aggregate function " + name, ErrorCodes::ILLEGAL_TYPE_OF_ARGUMENT);
return res;
}
}
void registerAggregateFunctionSumMap(AggregateFunctionFactory & factory)
{
factory.registerFunction("sumMap", createAggregateFunctionSumMap);
factory.registerFunction("sumMap", createAggregateFunctionSumMap<SumMapWithoutOverflow>);
factory.registerFunction("sumMapWithOverflow", createAggregateFunctionSumMap<SumMapWithOverflow>);
factory.registerFunction("sumMapFiltered", createAggregateFunctionSumMapFiltered<SumMapFilteredWithoutOverflow>);
factory.registerFunction("sumMapFilteredWithOverflow", createAggregateFunctionSumMapFiltered<SumMapFilteredWithOverflow>);
}
}

View File

@ -50,9 +50,9 @@ struct AggregateFunctionSumMapData
* ([1,2,3,4,5,6,7,8,9,10],[10,10,45,20,35,20,15,30,20,20])
*/
template <typename T>
class AggregateFunctionSumMap final : public IAggregateFunctionDataHelper<
AggregateFunctionSumMapData<NearestFieldType<T>>, AggregateFunctionSumMap<T>>
template <typename T, typename Derived, typename OverflowPolicy>
class AggregateFunctionSumMapBase : public IAggregateFunctionDataHelper<
AggregateFunctionSumMapData<NearestFieldType<T>>, Derived>
{
private:
using ColVecType = std::conditional_t<IsDecimalNumber<T>, ColumnDecimal<T>, ColumnVector<T>>;
@ -61,7 +61,7 @@ private:
DataTypes values_types;
public:
AggregateFunctionSumMap(const DataTypePtr & keys_type, const DataTypes & values_types)
AggregateFunctionSumMapBase(const DataTypePtr & keys_type, const DataTypes & values_types)
: keys_type(keys_type), values_types(values_types) {}
String getName() const override { return "sumMap"; }
@ -72,7 +72,7 @@ public:
types.emplace_back(std::make_shared<DataTypeArray>(keys_type));
for (const auto & value_type : values_types)
types.emplace_back(std::make_shared<DataTypeArray>(value_type));
types.emplace_back(std::make_shared<DataTypeArray>(OverflowPolicy::promoteType(value_type)));
return std::make_shared<DataTypeTuple>(types);
}
@ -109,6 +109,11 @@ public:
array_column.getData().get(values_vec_offset + i, value);
const auto & key = keys_vec.getData()[keys_vec_offset + i];
if (!keepKey(key))
{
continue;
}
IteratorType it;
if constexpr (IsDecimalNumber<T>)
{
@ -253,6 +258,52 @@ public:
}
const char * getHeaderFilePath() const override { return __FILE__; }
bool keepKey(const T & key) const { return static_cast<const Derived &>(*this).keepKey(key); }
};
template <typename T, typename OverflowPolicy>
class AggregateFunctionSumMap final :
public AggregateFunctionSumMapBase<T, AggregateFunctionSumMap<T, OverflowPolicy>, OverflowPolicy>
{
private:
using Self = AggregateFunctionSumMap<T, OverflowPolicy>;
using Base = AggregateFunctionSumMapBase<T, Self, OverflowPolicy>;
public:
AggregateFunctionSumMap(const DataTypePtr & keys_type, DataTypes & values_types)
: Base{keys_type, values_types}
{}
String getName() const override { return "sumMap"; }
bool keepKey(const T &) const { return true; }
};
template <typename T, typename OverflowPolicy>
class AggregateFunctionSumMapFiltered final :
public AggregateFunctionSumMapBase<T, AggregateFunctionSumMapFiltered<T, OverflowPolicy>, OverflowPolicy>
{
private:
using Self = AggregateFunctionSumMapFiltered<T, OverflowPolicy>;
using Base = AggregateFunctionSumMapBase<T, Self, OverflowPolicy>;
std::unordered_set<T> keys_to_keep;
public:
AggregateFunctionSumMapFiltered(const DataTypePtr & keys_type, const DataTypes & values_types, const Array & keys_to_keep_)
: Base{keys_type, values_types}
{
keys_to_keep.reserve(keys_to_keep_.size());
for (const Field & f : keys_to_keep_)
{
keys_to_keep.emplace(f.safeGet<NearestFieldType<T>>());
}
}
String getName() const override { return "sumMapFiltered"; }
bool keepKey(const T & key) const { return keys_to_keep.count(key); }
};
}

View File

@ -6,25 +6,25 @@ namespace DB
{
/** Data for HyperLogLogBiasEstimator in the uniqCombined function.
  * The development plan is as follows:
  * 1. Assemble ClickHouse.
  * 2. Run the script src/dbms/scripts/gen-bias-data.py, which returns one array for getRawEstimates()
  * and another array for getBiases().
  * 3. Update `raw_estimates` and `biases` arrays. Also update the size of arrays in InterpolatedData.
  * 4. Assemble ClickHouse.
  * 5. Run the script src/dbms/scripts/linear-counting-threshold.py, which creates 3 files:
  * - raw_graph.txt (1st column: the present number of unique values;
  * 2nd column: relative error in the case of HyperLogLog without applying any corrections)
  * - linear_counting_graph.txt (1st column: the present number of unique values;
  * 2nd column: relative error in the case of HyperLogLog using LinearCounting)
  * - bias_corrected_graph.txt (1st column: the present number of unique values;
  * 2nd column: relative error in the case of HyperLogLog with the use of corrections from the algorithm HyperLogLog++)
  * 6. Generate a graph with gnuplot based on this data.
  * 7. Determine the minimum number of unique values at which it is better to correct the error
  * using its evaluation (ie, using the HyperLogLog++ algorithm) than applying the LinearCounting algorithm.
  * 7. Accordingly, update the constant in the function getThreshold()
  * 8. Assemble ClickHouse.
  */
* The development plan is as follows:
* 1. Assemble ClickHouse.
* 2. Run the script src/dbms/scripts/gen-bias-data.py, which returns one array for getRawEstimates()
* and another array for getBiases().
* 3. Update `raw_estimates` and `biases` arrays. Also update the size of arrays in InterpolatedData.
* 4. Assemble ClickHouse.
* 5. Run the script src/dbms/scripts/linear-counting-threshold.py, which creates 3 files:
* - raw_graph.txt (1st column: the present number of unique values;
* 2nd column: relative error in the case of HyperLogLog without applying any corrections)
* - linear_counting_graph.txt (1st column: the present number of unique values;
* 2nd column: relative error in the case of HyperLogLog using LinearCounting)
* - bias_corrected_graph.txt (1st column: the present number of unique values;
* 2nd column: relative error in the case of HyperLogLog with the use of corrections from the algorithm HyperLogLog++)
* 6. Generate a graph with gnuplot based on this data.
* 7. Determine the minimum number of unique values at which it is better to correct the error
* using its evaluation (ie, using the HyperLogLog++ algorithm) than applying the LinearCounting algorithm.
* 7. Accordingly, update the constant in the function getThreshold()
* 8. Assemble ClickHouse.
*/
struct UniqCombinedBiasData
{
using InterpolatedData = std::array<double, 200>;

View File

@ -15,33 +15,33 @@
/** Approximate calculation of anything, as usual, is constructed according to the following scheme:
  * - some data structure is used to calculate the value of X;
  * - Not all values are added to the data structure, but only selected ones (according to some selectivity criteria);
  * - after processing all elements, the data structure is in some state S;
  * - as an approximate value of X, the value calculated according to the maximum likelihood principle is returned:
  * at what real value X, the probability of finding the data structure in the obtained state S is maximal.
  */
* - some data structure is used to calculate the value of X;
* - Not all values are added to the data structure, but only selected ones (according to some selectivity criteria);
* - after processing all elements, the data structure is in some state S;
* - as an approximate value of X, the value calculated according to the maximum likelihood principle is returned:
* at what real value X, the probability of finding the data structure in the obtained state S is maximal.
*/
/** In particular, what is described below can be found by the name of the BJKST algorithm.
  */
*/
/** Very simple hash-set for approximate number of unique values.
  * Works like this:
  * - you can insert UInt64;
  * - before insertion, first the hash function UInt64 -> UInt32 is calculated;
  * - the original value is not saved (lost);
  * - further all operations are made with these hashes;
  * - hash table is constructed according to the scheme:
  * - open addressing (one buffer, position in buffer is calculated by taking remainder of division by its size);
  * - linear probing (if the cell already has a value, then the cell following it is taken, etc.);
  * - the missing value is zero-encoded; to remember presence of zero in set, separate variable of type bool is used;
  * - buffer growth by 2 times when filling more than 50%;
  * - if the set has more UNIQUES_HASH_MAX_SIZE elements, then all the elements are removed from the set,
  * not divisible by 2, and then all elements that do not divide by 2 are not inserted into the set;
  * - if the situation repeats, then only elements dividing by 4, etc., are taken.
  * - the size() method returns an approximate number of elements that have been inserted into the set;
  * - there are methods for quick reading and writing in binary and text form.
  */
* Works like this:
* - you can insert UInt64;
* - before insertion, first the hash function UInt64 -> UInt32 is calculated;
* - the original value is not saved (lost);
* - further all operations are made with these hashes;
* - hash table is constructed according to the scheme:
* - open addressing (one buffer, position in buffer is calculated by taking remainder of division by its size);
* - linear probing (if the cell already has a value, then the cell following it is taken, etc.);
* - the missing value is zero-encoded; to remember presence of zero in set, separate variable of type bool is used;
* - buffer growth by 2 times when filling more than 50%;
* - if the set has more UNIQUES_HASH_MAX_SIZE elements, then all the elements are removed from the set,
* not divisible by 2, and then all elements that do not divide by 2 are not inserted into the set;
* - if the situation repeats, then only elements dividing by 4, etc., are taken.
* - the size() method returns an approximate number of elements that have been inserted into the set;
* - there are methods for quick reading and writing in binary and text form.
*/
/// The maximum degree of buffer size before the values are discarded
#define UNIQUES_HASH_MAX_SIZE_DEGREE 17
@ -50,8 +50,8 @@
#define UNIQUES_HASH_MAX_SIZE (1ULL << (UNIQUES_HASH_MAX_SIZE_DEGREE - 1))
/** The number of least significant bits used for thinning. The remaining high-order bits are used to determine the position in the hash table.
  * (high-order bits are taken because the younger bits will be constant after dropping some of the values)
  */
* (high-order bits are taken because the younger bits will be constant after dropping some of the values)
*/
#define UNIQUES_HASH_BITS_FOR_SKIP (32 - UNIQUES_HASH_MAX_SIZE_DEGREE)
/// Initial buffer size degree
@ -59,8 +59,8 @@
/** This hash function is not the most optimal, but UniquesHashSet states counted with it,
  * stored in many places on disks (in the Yandex.Metrika), so it continues to be used.
  */
* stored in many places on disks (in the Yandex.Metrika), so it continues to be used.
*/
struct UniquesHashSetDefaultHash
{
size_t operator() (UInt64 x) const

View File

@ -9,9 +9,9 @@
/** In a loop it connects to the server and immediately breaks the connection.
  * Using the SO_LINGER option, we ensure that the connection is terminated by sending a RST packet (not FIN).
  * This behavior causes a bug in the TCPServer implementation in the Poco library.
  */
* Using the SO_LINGER option, we ensure that the connection is terminated by sending a RST packet (not FIN).
* This behavior causes a bug in the TCPServer implementation in the Poco library.
*/
int main(int argc, char ** argv)
try
{

View File

@ -1,4 +1,4 @@
set(SRCS)
add_executable (column_unique column_unique.cpp ${SRCS})
if(USE_GTEST)
add_executable(column_unique column_unique.cpp)
target_link_libraries(column_unique PRIVATE dbms gtest_main)
endif()

View File

@ -408,6 +408,11 @@ namespace ErrorCodes
extern const int ILLEGAL_SYNTAX_FOR_CODEC_TYPE = 431;
extern const int UNKNOWN_CODEC = 432;
extern const int ILLEGAL_CODEC_PARAMETER = 433;
extern const int CANNOT_PARSE_PROTOBUF_SCHEMA = 434;
extern const int NO_DATA_FOR_REQUIRED_PROTOBUF_FIELD = 435;
extern const int CANNOT_CONVERT_TO_PROTOBUF_TYPE = 436;
extern const int PROTOBUF_FIELD_NOT_REPEATED = 437;
extern const int DATA_TYPE_CANNOT_BE_PROMOTED = 438;
extern const int KEEPER_EXCEPTION = 999;
extern const int POCO_EXCEPTION = 1000;

View File

@ -45,10 +45,10 @@ static int sigtimedwait(const sigset_t *set, siginfo_t *info, const struct times
/** As long as there exists an object of this class - it blocks the INT signal, at the same time it lets you know if it came.
  * This is necessary so that you can interrupt the execution of the request with Ctrl+C.
  * Use only one instance of this class at a time.
  * If `check` method returns true (the signal has arrived), the next call will wait for the next signal.
  */
* This is necessary so that you can interrupt the execution of the request with Ctrl+C.
* Use only one instance of this class at a time.
* If `check` method returns true (the signal has arrived), the next call will wait for the next signal.
*/
class InterruptListener
{
private:

View File

@ -17,6 +17,7 @@
#cmakedefine01 USE_HDFS
#cmakedefine01 USE_XXHASH
#cmakedefine01 USE_INTERNAL_LLVM_LIBRARY
#cmakedefine01 USE_PROTOBUF
#cmakedefine01 CLICKHOUSE_SPLIT_BINARY
#cmakedefine01 LLVM_HAS_RTTI

View File

@ -70,7 +70,7 @@ inline void copyOverlap8(UInt8 * op, const UInt8 *& match, const size_t offset)
}
#ifdef __x86_64__
#if defined(__x86_64__) || defined(__PPC__)
/** We use 'xmm' (128bit SSE) registers here to shuffle 16 bytes.
*
@ -260,7 +260,7 @@ inline void copyOverlap16(UInt8 * op, const UInt8 *& match, const size_t offset)
}
#ifdef __x86_64__
#if defined(__x86_64__) || defined(__PPC__)
inline void copyOverlap16Shuffle(UInt8 * op, const UInt8 *& match, const size_t offset)
{

View File

@ -427,6 +427,18 @@ Names Block::getNames() const
}
DataTypes Block::getDataTypes() const
{
DataTypes res;
res.reserve(columns());
for (const auto & elem : data)
res.push_back(elem.type);
return res;
}
template <typename ReturnType>
static ReturnType checkBlockStructure(const Block & lhs, const Block & rhs, const std::string & context_description)
{

View File

@ -82,6 +82,7 @@ public:
const ColumnsWithTypeAndName & getColumnsWithTypeAndName() const;
NamesAndTypesList getNamesAndTypesList() const;
Names getNames() const;
DataTypes getDataTypes() const;
/// Returns number of rows from first column in block, not equal to nullptr. If no columns, returns 0.
size_t rows() const;

View File

@ -82,9 +82,9 @@
#endif
#define PLATFORM_NOT_SUPPORTED "The only supported platforms are x86_64 and AArch64 (work in progress)"
#define PLATFORM_NOT_SUPPORTED "The only supported platforms are x86_64 and AArch64, PowerPC (work in progress)"
#if !defined(__x86_64__) && !defined(__aarch64__)
#if !defined(__x86_64__) && !defined(__aarch64__) && !defined(__PPC__)
// #error PLATFORM_NOT_SUPPORTED
#endif

View File

@ -166,3 +166,20 @@ template <> constexpr bool IsDecimalNumber<Decimal64> = true;
template <> constexpr bool IsDecimalNumber<Decimal128> = true;
}
/// Specialization of `std::hash` for the Decimal<T> types.
namespace std
{
template <typename T>
struct hash<DB::Decimal<T>> { size_t operator()(const DB::Decimal<T> & x) const { return hash<T>()(x.value); } };
template <>
struct hash<DB::Decimal128>
{
size_t operator()(const DB::Decimal128 & x) const
{
return std::hash<DB::Int64>()(x.value >> 64)
^ std::hash<DB::Int64>()(x.value & std::numeric_limits<DB::UInt64>::max());
}
};
}

View File

@ -1,6 +1,6 @@
#pragma once
#include <DataStreams/IProfilingBlockInputStream.h>
#include <DataStreams/IBlockInputStream.h>
#include <Core/ColumnWithTypeAndName.h>
namespace DB
@ -9,7 +9,7 @@ namespace DB
/** Adds a materialized const column to the block with a specified value.
*/
template <typename T>
class AddingConstColumnBlockInputStream : public IProfilingBlockInputStream
class AddingConstColumnBlockInputStream : public IBlockInputStream
{
public:
AddingConstColumnBlockInputStream(

View File

@ -1,6 +1,6 @@
#pragma once
#include <DataStreams/IProfilingBlockInputStream.h>
#include <DataStreams/IBlockInputStream.h>
#include <Storages/ColumnDefault.h>
#include <Interpreters/Context.h>
@ -9,7 +9,7 @@ namespace DB
{
/// Adds defaults to columns using BlockDelayedDefaults bitmask attached to Block by child InputStream.
class AddingDefaultsBlockInputStream : public IProfilingBlockInputStream
class AddingDefaultsBlockInputStream : public IBlockInputStream
{
public:
AddingDefaultsBlockInputStream(

View File

@ -1,6 +1,6 @@
#pragma once
#include <DataStreams/IProfilingBlockInputStream.h>
#include <DataStreams/IBlockInputStream.h>
#include <Storages/ColumnDefault.h>
@ -14,7 +14,7 @@ namespace DB
* 3. Columns that materialized from other columns (materialized columns)
* All three types of columns are materialized (not constants).
*/
class AddingMissedBlockInputStream : public IProfilingBlockInputStream
class AddingMissedBlockInputStream : public IBlockInputStream
{
public:
AddingMissedBlockInputStream(

View File

@ -3,7 +3,7 @@
#include <Interpreters/Aggregator.h>
#include <IO/ReadBufferFromFile.h>
#include <Compression/CompressedReadBuffer.h>
#include <DataStreams/IProfilingBlockInputStream.h>
#include <DataStreams/IBlockInputStream.h>
namespace DB
@ -15,7 +15,7 @@ namespace DB
* If final = false, the aggregate functions are not finalized, that is, they are not replaced by their value, but contain an intermediate state of calculations.
* This is necessary so that aggregation can continue (for example, by combining streams of partially aggregated data).
*/
class AggregatingBlockInputStream : public IProfilingBlockInputStream
class AggregatingBlockInputStream : public IBlockInputStream
{
public:
/** keys are taken from the GROUP BY part of the query

View File

@ -2,7 +2,7 @@
#include <Poco/Event.h>
#include <DataStreams/IProfilingBlockInputStream.h>
#include <DataStreams/IBlockInputStream.h>
#include <Common/setThreadName.h>
#include <Common/CurrentMetrics.h>
#include <common/ThreadPool.h>
@ -26,7 +26,7 @@ namespace DB
* has come over the network with a request to interrupt the execution of the query.
* It also allows you to execute multiple queries at the same time.
*/
class AsynchronousBlockInputStream : public IProfilingBlockInputStream
class AsynchronousBlockInputStream : public IBlockInputStream
{
public:
AsynchronousBlockInputStream(const BlockInputStreamPtr & in)

View File

@ -1,5 +1,5 @@
#include <DataStreams/BlockStreamProfileInfo.h>
#include <DataStreams/IProfilingBlockInputStream.h>
#include <DataStreams/IBlockInputStream.h>
#include <IO/ReadHelpers.h>
#include <IO/WriteHelpers.h>
@ -77,7 +77,7 @@ void BlockStreamProfileInfo::collectInfosForStreamsWithName(const char * name, B
return;
}
parent->forEachProfilingChild([&] (IProfilingBlockInputStream & child)
parent->forEachChild([&] (IBlockInputStream & child)
{
child.getProfileInfo().collectInfosForStreamsWithName(name, res);
return false;
@ -107,7 +107,7 @@ void BlockStreamProfileInfo::calculateRowsBeforeLimit() const
for (const BlockStreamProfileInfo * info_limit_or_sort : limits_or_sortings)
{
info_limit_or_sort->parent->forEachProfilingChild([&] (IProfilingBlockInputStream & child)
info_limit_or_sort->parent->forEachChild([&] (IBlockInputStream & child)
{
rows_before_limit += child.getProfileInfo().rows;
return false;

View File

@ -10,13 +10,13 @@ namespace DB
class Block;
class ReadBuffer;
class WriteBuffer;
class IProfilingBlockInputStream;
class IBlockInputStream;
/// Information for profiling. See IProfilingBlockInputStream.h
/// Information for profiling. See IBlockInputStream.h
struct BlockStreamProfileInfo
{
/// Info about stream object this profile info refers to.
IProfilingBlockInputStream * parent = nullptr;
IBlockInputStream * parent = nullptr;
bool started = false;
Stopwatch total_stopwatch {CLOCK_MONOTONIC_COARSE}; /// Time with waiting time

View File

@ -1,6 +1,6 @@
#pragma once
#include <DataStreams/IProfilingBlockInputStream.h>
#include <DataStreams/IBlockInputStream.h>
namespace DB
@ -9,7 +9,7 @@ namespace DB
/** A stream of blocks from which you can read the next block from an explicitly provided list.
* Also see OneBlockInputStream.
*/
class BlocksListBlockInputStream : public IProfilingBlockInputStream
class BlocksListBlockInputStream : public IBlockInputStream
{
public:
/// Acquires the ownership of the block list.

View File

@ -1,6 +1,6 @@
#pragma once
#include <common/logger_useful.h>
#include <DataStreams/IProfilingBlockInputStream.h>
#include <DataStreams/IBlockInputStream.h>
#include <Core/SortDescription.h>
#include <Columns/ColumnsNumber.h>
#include <Common/typeid_cast.h>
@ -12,7 +12,7 @@ namespace DB
/// Collapses the same rows with the opposite sign roughly like CollapsingSortedBlockInputStream.
/// Outputs the rows in random order (the input streams must still be ordered).
/// Outputs only rows with a positive sign.
class CollapsingFinalBlockInputStream : public IProfilingBlockInputStream
class CollapsingFinalBlockInputStream : public IBlockInputStream
{
public:
CollapsingFinalBlockInputStream(

View File

@ -1,6 +1,6 @@
#pragma once
#include <DataStreams/IProfilingBlockInputStream.h>
#include <DataStreams/IBlockInputStream.h>
#include <IO/ReadBuffer.h>
#include <Common/PODArray.h>
@ -53,7 +53,7 @@ using MergedRowSources = PODArray<RowSourcePart>;
* Stream mask maps row number to index of source stream.
* Streams should contain exactly one column.
*/
class ColumnGathererStream : public IProfilingBlockInputStream
class ColumnGathererStream : public IBlockInputStream
{
public:
ColumnGathererStream(

View File

@ -1,6 +1,6 @@
#pragma once
#include <DataStreams/IProfilingBlockInputStream.h>
#include <DataStreams/IBlockInputStream.h>
namespace DB
@ -11,7 +11,7 @@ namespace DB
* Unlike UnionBlockInputStream, it does this sequentially.
* Blocks of different sources are not interleaved with each other.
*/
class ConcatBlockInputStream : public IProfilingBlockInputStream
class ConcatBlockInputStream : public IBlockInputStream
{
public:
ConcatBlockInputStream(BlockInputStreams inputs_)

View File

@ -1,6 +1,6 @@
#pragma once
#include <DataStreams/IProfilingBlockInputStream.h>
#include <DataStreams/IBlockInputStream.h>
#include <Columns/ColumnLowCardinality.h>
#include <DataTypes/DataTypeLowCardinality.h>
#include <Columns/ColumnConst.h>
@ -13,7 +13,7 @@ namespace DB
* Unlike UnionBlockInputStream, it does this sequentially.
* Blocks of different sources are not interleaved with each other.
*/
class ConvertColumnLowCardinalityToFullBlockInputStream : public IProfilingBlockInputStream
class ConvertColumnLowCardinalityToFullBlockInputStream : public IBlockInputStream
{
public:
explicit ConvertColumnLowCardinalityToFullBlockInputStream(const BlockInputStreamPtr & input)

View File

@ -1,7 +1,7 @@
#pragma once
#include <unordered_map>
#include <DataStreams/IProfilingBlockInputStream.h>
#include <DataStreams/IBlockInputStream.h>
namespace DB
@ -20,7 +20,7 @@ namespace DB
* throw if they are const in result and non const in source,
* or if they are const and have different values.
*/
class ConvertingBlockInputStream : public IProfilingBlockInputStream
class ConvertingBlockInputStream : public IBlockInputStream
{
public:
enum class MatchColumnsMode

View File

@ -1,6 +1,6 @@
#pragma once
#include <DataStreams/IBlockOutputStream.h>
#include <DataStreams/IProfilingBlockInputStream.h>
#include <DataStreams/IBlockInputStream.h>
#include <Interpreters/ProcessList.h>

View File

@ -59,12 +59,7 @@ void CreatingSetsBlockInputStream::readPrefixImpl()
Block CreatingSetsBlockInputStream::getTotals()
{
auto input = dynamic_cast<IProfilingBlockInputStream *>(children.back().get());
if (input)
return input->getTotals();
else
return totals;
return children.back()->getTotals();
}
@ -158,9 +153,7 @@ void CreatingSetsBlockInputStream::createOne(SubqueryForSet & subquery)
if (done_with_set && done_with_join && done_with_table)
{
if (IProfilingBlockInputStream * profiling_in = dynamic_cast<IProfilingBlockInputStream *>(&*subquery.source))
profiling_in->cancel(false);
subquery.source->cancel(false);
break;
}
}
@ -171,15 +164,12 @@ void CreatingSetsBlockInputStream::createOne(SubqueryForSet & subquery)
watch.stop();
size_t head_rows = 0;
if (IProfilingBlockInputStream * profiling_in = dynamic_cast<IProfilingBlockInputStream *>(&*subquery.source))
{
const BlockStreamProfileInfo & profile_info = profiling_in->getProfileInfo();
const BlockStreamProfileInfo & profile_info = subquery.source->getProfileInfo();
head_rows = profile_info.rows;
if (subquery.join)
subquery.join->setTotals(profiling_in->getTotals());
}
subquery.join->setTotals(subquery.source->getTotals());
if (head_rows != 0)
{

View File

@ -1,7 +1,7 @@
#pragma once
#include <Poco/Logger.h>
#include <DataStreams/IProfilingBlockInputStream.h>
#include <DataStreams/IBlockInputStream.h>
#include <Interpreters/ExpressionAnalyzer.h> /// SubqueriesForSets
@ -14,7 +14,7 @@ namespace DB
* in the `readPrefix` function or before reading the first block
* initializes all the passed sets.
*/
class CreatingSetsBlockInputStream : public IProfilingBlockInputStream
class CreatingSetsBlockInputStream : public IBlockInputStream
{
public:
CreatingSetsBlockInputStream(

View File

@ -1,6 +1,6 @@
#pragma once
#include <DataStreams/IProfilingBlockInputStream.h>
#include <DataStreams/IBlockInputStream.h>
#include <Interpreters/Aggregator.h>
#include <Core/ColumnNumbers.h>
@ -14,7 +14,7 @@ class ExpressionActions;
/** Takes blocks after grouping, with non-finalized aggregate functions.
* Calculates all subsets of columns and aggreagetes over them.
*/
class CubeBlockInputStream : public IProfilingBlockInputStream
class CubeBlockInputStream : public IBlockInputStream
{
private:
using ExpressionActionsPtr = std::shared_ptr<ExpressionActions>;

View File

@ -1,6 +1,6 @@
#pragma once
#include <DataStreams/IProfilingBlockInputStream.h>
#include <DataStreams/IBlockInputStream.h>
#include <Interpreters/SetVariants.h>
namespace DB
@ -13,7 +13,7 @@ namespace DB
* set limit_hint to non zero value. So we stop emitting new rows after
* count of already emitted rows will reach the limit_hint.
*/
class DistinctBlockInputStream : public IProfilingBlockInputStream
class DistinctBlockInputStream : public IBlockInputStream
{
public:
/// Empty columns_ means all collumns.

View File

@ -1,6 +1,6 @@
#pragma once
#include <DataStreams/IProfilingBlockInputStream.h>
#include <DataStreams/IBlockInputStream.h>
#include <Interpreters/SetVariants.h>
@ -17,7 +17,7 @@ namespace DB
* set limit_hint to non zero value. So we stop emitting new rows after
* count of already emitted rows will reach the limit_hint.
*/
class DistinctSortedBlockInputStream : public IProfilingBlockInputStream
class DistinctSortedBlockInputStream : public IBlockInputStream
{
public:
/// Empty columns_ means all collumns.

View File

@ -15,11 +15,8 @@ String ExpressionBlockInputStream::getName() const { return "Expression"; }
Block ExpressionBlockInputStream::getTotals()
{
if (IProfilingBlockInputStream * child = dynamic_cast<IProfilingBlockInputStream *>(&*children.back()))
{
totals = child->getTotals();
totals = children.back()->getTotals();
expression->executeOnTotals(totals);
}
return totals;
}

View File

@ -1,6 +1,6 @@
#pragma once
#include <DataStreams/IProfilingBlockInputStream.h>
#include <DataStreams/IBlockInputStream.h>
namespace DB
@ -13,7 +13,7 @@ class ExpressionActions;
* For example: hits * 2 + 3, url LIKE '%yandex%'
* The expression processes each row independently of the others.
*/
class ExpressionBlockInputStream : public IProfilingBlockInputStream
class ExpressionBlockInputStream : public IBlockInputStream
{
private:
using ExpressionActionsPtr = std::shared_ptr<ExpressionActions>;

View File

@ -52,11 +52,8 @@ String FilterBlockInputStream::getName() const { return "Filter"; }
Block FilterBlockInputStream::getTotals()
{
if (IProfilingBlockInputStream * child = dynamic_cast<IProfilingBlockInputStream *>(&*children.back()))
{
totals = child->getTotals();
totals = children.back()->getTotals();
expression->executeOnTotals(totals);
}
return totals;
}

View File

@ -1,6 +1,6 @@
#pragma once
#include <DataStreams/IProfilingBlockInputStream.h>
#include <DataStreams/IBlockInputStream.h>
#include <Columns/FilterDescription.h>
@ -14,7 +14,7 @@ class ExpressionActions;
* A stream of blocks and an expression, which adds to the block one ColumnUInt8 column containing the filtering conditions, are passed as input.
* The expression is evaluated and a stream of blocks is returned, which contains only the filtered rows.
*/
class FilterBlockInputStream : public IProfilingBlockInputStream
class FilterBlockInputStream : public IBlockInputStream
{
private:
using ExpressionActionsPtr = std::shared_ptr<ExpressionActions>;

View File

@ -1,6 +1,6 @@
#pragma once
#include <DataStreams/IProfilingBlockInputStream.h>
#include <DataStreams/IBlockInputStream.h>
#include <iostream>
namespace DB
@ -9,7 +9,7 @@ namespace DB
/// Removes columns other than columns_to_save_ from block,
/// and reorders columns as in columns_to_save_.
/// Functionality is similar to ExpressionBlockInputStream with ExpressionActions containing PROJECT action.
class FilterColumnsBlockInputStream : public IProfilingBlockInputStream
class FilterColumnsBlockInputStream : public IBlockInputStream
{
public:
FilterColumnsBlockInputStream(

View File

@ -2,7 +2,7 @@
#include <Core/SortDescription.h>
#include <Interpreters/sortBlock.h>
#include <DataStreams/IProfilingBlockInputStream.h>
#include <DataStreams/IBlockInputStream.h>
namespace DB
@ -11,7 +11,7 @@ namespace DB
/** Takes stream already sorted by `x` and finishes sorting it by (`x`, `y`).
* During sorting only blocks with rows that equal by `x` saved in RAM.
* */
class FinishSortingBlockInputStream : public IProfilingBlockInputStream
class FinishSortingBlockInputStream : public IBlockInputStream
{
public:
/// limit - if not 0, allowed to return just first 'limit' rows in sorted order.

View File

@ -1,9 +1,14 @@
#include <math.h>
#include <DataStreams/IProfilingBlockInputStream.h>
#include <DataStreams/IBlockInputStream.h>
#include <IO/WriteHelpers.h>
#include <Interpreters/ProcessList.h>
#include <Interpreters/Quota.h>
#include <Common/CurrentThread.h>
namespace ProfileEvents
{
extern const Event ThrottlerSleepMicroseconds;
}
namespace DB
@ -11,12 +16,414 @@ namespace DB
namespace ErrorCodes
{
extern const int TOO_MANY_ROWS;
extern const int TOO_MANY_BYTES;
extern const int TOO_MANY_ROWS_OR_BYTES;
extern const int TIMEOUT_EXCEEDED;
extern const int TOO_SLOW;
extern const int LOGICAL_ERROR;
extern const int BLOCKS_HAVE_DIFFERENT_STRUCTURE;
extern const int TOO_DEEP_PIPELINE;
}
/** It's safe to access children without mutex as long as these methods are called before first call to read, readPrefix.
/// It's safe to access children without mutex as long as these methods are called before first call to `read()` or `readPrefix()`.
Block IBlockInputStream::read()
{
if (total_rows_approx)
{
progressImpl(Progress(0, 0, total_rows_approx));
total_rows_approx = 0;
}
if (!info.started)
{
info.total_stopwatch.start();
info.started = true;
}
Block res;
if (isCancelledOrThrowIfKilled())
return res;
if (!checkTimeLimit())
limit_exceeded_need_break = true;
if (!limit_exceeded_need_break)
res = readImpl();
if (res)
{
info.update(res);
if (enabled_extremes)
updateExtremes(res);
if (limits.mode == LIMITS_CURRENT && !limits.size_limits.check(info.rows, info.bytes, "result", ErrorCodes::TOO_MANY_ROWS_OR_BYTES))
limit_exceeded_need_break = true;
if (quota != nullptr)
checkQuota(res);
}
else
{
/** If the thread is over, then we will ask all children to abort the execution.
* This makes sense when running a query with LIMIT
* - there is a situation when all the necessary data has already been read,
* but children sources are still working,
* herewith they can work in separate threads or even remotely.
*/
cancel(false);
}
progress(Progress(res.rows(), res.bytes()));
#ifndef NDEBUG
if (res)
{
Block header = getHeader();
if (header)
assertBlocksHaveEqualStructure(res, header, getName());
}
#endif
return res;
}
void IBlockInputStream::readPrefix()
{
readPrefixImpl();
forEachChild([&] (IBlockInputStream & child)
{
child.readPrefix();
return false;
});
}
void IBlockInputStream::readSuffix()
{
forEachChild([&] (IBlockInputStream & child)
{
child.readSuffix();
return false;
});
readSuffixImpl();
}
void IBlockInputStream::updateExtremes(Block & block)
{
size_t num_columns = block.columns();
if (!extremes)
{
MutableColumns extremes_columns(num_columns);
for (size_t i = 0; i < num_columns; ++i)
{
const ColumnPtr & src = block.safeGetByPosition(i).column;
if (src->isColumnConst())
{
/// Equal min and max.
extremes_columns[i] = src->cloneResized(2);
}
else
{
Field min_value;
Field max_value;
src->getExtremes(min_value, max_value);
extremes_columns[i] = src->cloneEmpty();
extremes_columns[i]->insert(min_value);
extremes_columns[i]->insert(max_value);
}
}
extremes = block.cloneWithColumns(std::move(extremes_columns));
}
else
{
for (size_t i = 0; i < num_columns; ++i)
{
ColumnPtr & old_extremes = extremes.safeGetByPosition(i).column;
if (old_extremes->isColumnConst())
continue;
Field min_value = (*old_extremes)[0];
Field max_value = (*old_extremes)[1];
Field cur_min_value;
Field cur_max_value;
block.safeGetByPosition(i).column->getExtremes(cur_min_value, cur_max_value);
if (cur_min_value < min_value)
min_value = cur_min_value;
if (cur_max_value > max_value)
max_value = cur_max_value;
MutableColumnPtr new_extremes = old_extremes->cloneEmpty();
new_extremes->insert(min_value);
new_extremes->insert(max_value);
old_extremes = std::move(new_extremes);
}
}
}
static bool handleOverflowMode(OverflowMode mode, const String & message, int code)
{
switch (mode)
{
case OverflowMode::THROW:
throw Exception(message, code);
case OverflowMode::BREAK:
return false;
default:
throw Exception("Logical error: unknown overflow mode", ErrorCodes::LOGICAL_ERROR);
}
}
bool IBlockInputStream::checkTimeLimit()
{
if (limits.max_execution_time != 0
&& info.total_stopwatch.elapsed() > static_cast<UInt64>(limits.max_execution_time.totalMicroseconds()) * 1000)
return handleOverflowMode(limits.timeout_overflow_mode,
"Timeout exceeded: elapsed " + toString(info.total_stopwatch.elapsedSeconds())
+ " seconds, maximum: " + toString(limits.max_execution_time.totalMicroseconds() / 1000000.0),
ErrorCodes::TIMEOUT_EXCEEDED);
return true;
}
void IBlockInputStream::checkQuota(Block & block)
{
switch (limits.mode)
{
case LIMITS_TOTAL:
/// Checked in `progress` method.
break;
case LIMITS_CURRENT:
{
time_t current_time = time(nullptr);
double total_elapsed = info.total_stopwatch.elapsedSeconds();
quota->checkAndAddResultRowsBytes(current_time, block.rows(), block.bytes());
quota->checkAndAddExecutionTime(current_time, Poco::Timespan((total_elapsed - prev_elapsed) * 1000000.0));
prev_elapsed = total_elapsed;
break;
}
}
}
void IBlockInputStream::progressImpl(const Progress & value)
{
if (progress_callback)
progress_callback(value);
if (process_list_elem)
{
if (!process_list_elem->updateProgressIn(value))
cancel(/* kill */ true);
/// The total amount of data processed or intended for processing in all leaf sources, possibly on remote servers.
ProgressValues progress = process_list_elem->getProgressIn();
size_t total_rows_estimate = std::max(progress.rows, progress.total_rows);
/** Check the restrictions on the amount of data to read, the speed of the query, the quota on the amount of data to read.
* NOTE: Maybe it makes sense to have them checked directly in ProcessList?
*/
if (limits.mode == LIMITS_TOTAL
&& ((limits.size_limits.max_rows && total_rows_estimate > limits.size_limits.max_rows)
|| (limits.size_limits.max_bytes && progress.bytes > limits.size_limits.max_bytes)))
{
switch (limits.size_limits.overflow_mode)
{
case OverflowMode::THROW:
{
if (limits.size_limits.max_rows && total_rows_estimate > limits.size_limits.max_rows)
throw Exception("Limit for rows to read exceeded: " + toString(total_rows_estimate)
+ " rows read (or to read), maximum: " + toString(limits.size_limits.max_rows),
ErrorCodes::TOO_MANY_ROWS);
else
throw Exception("Limit for (uncompressed) bytes to read exceeded: " + toString(progress.bytes)
+ " bytes read, maximum: " + toString(limits.size_limits.max_bytes),
ErrorCodes::TOO_MANY_BYTES);
}
case OverflowMode::BREAK:
{
/// For `break`, we will stop only if so many rows were actually read, and not just supposed to be read.
if ((limits.size_limits.max_rows && progress.rows > limits.size_limits.max_rows)
|| (limits.size_limits.max_bytes && progress.bytes > limits.size_limits.max_bytes))
{
cancel(false);
}
break;
}
default:
throw Exception("Logical error: unknown overflow mode", ErrorCodes::LOGICAL_ERROR);
}
}
size_t total_rows = progress.total_rows;
constexpr UInt64 profile_events_update_period_microseconds = 10 * 1000; // 10 milliseconds
UInt64 total_elapsed_microseconds = info.total_stopwatch.elapsedMicroseconds();
if (last_profile_events_update_time + profile_events_update_period_microseconds < total_elapsed_microseconds)
{
CurrentThread::updatePerformanceCounters();
last_profile_events_update_time = total_elapsed_microseconds;
}
if ((limits.min_execution_speed || (total_rows && limits.timeout_before_checking_execution_speed != 0))
&& (static_cast<Int64>(total_elapsed_microseconds) > limits.timeout_before_checking_execution_speed.totalMicroseconds()))
{
/// Do not count sleeps in throttlers
UInt64 throttler_sleep_microseconds = CurrentThread::getProfileEvents()[ProfileEvents::ThrottlerSleepMicroseconds];
double elapsed_seconds = (throttler_sleep_microseconds > total_elapsed_microseconds)
? 0.0 : (total_elapsed_microseconds - throttler_sleep_microseconds) / 1000000.0;
if (elapsed_seconds > 0)
{
if (limits.min_execution_speed && progress.rows / elapsed_seconds < limits.min_execution_speed)
throw Exception("Query is executing too slow: " + toString(progress.rows / elapsed_seconds)
+ " rows/sec., minimum: " + toString(limits.min_execution_speed),
ErrorCodes::TOO_SLOW);
/// If the predicted execution time is longer than `max_execution_time`.
if (limits.max_execution_time != 0 && total_rows)
{
double estimated_execution_time_seconds = elapsed_seconds * (static_cast<double>(total_rows) / progress.rows);
if (estimated_execution_time_seconds > limits.max_execution_time.totalSeconds())
throw Exception("Estimated query execution time (" + toString(estimated_execution_time_seconds) + " seconds)"
+ " is too long. Maximum: " + toString(limits.max_execution_time.totalSeconds())
+ ". Estimated rows to process: " + toString(total_rows),
ErrorCodes::TOO_SLOW);
}
}
}
if (quota != nullptr && limits.mode == LIMITS_TOTAL)
{
quota->checkAndAddReadRowsBytes(time(nullptr), value.rows, value.bytes);
}
}
}
void IBlockInputStream::cancel(bool kill)
{
if (kill)
is_killed = true;
bool old_val = false;
if (!is_cancelled.compare_exchange_strong(old_val, true, std::memory_order_seq_cst, std::memory_order_relaxed))
return;
forEachChild([&] (IBlockInputStream & child)
{
child.cancel(kill);
return false;
});
}
bool IBlockInputStream::isCancelled() const
{
return is_cancelled;
}
bool IBlockInputStream::isCancelledOrThrowIfKilled() const
{
if (!is_cancelled)
return false;
if (is_killed)
throw Exception("Query was cancelled", ErrorCodes::QUERY_WAS_CANCELLED);
return true;
}
void IBlockInputStream::setProgressCallback(const ProgressCallback & callback)
{
progress_callback = callback;
forEachChild([&] (IBlockInputStream & child)
{
child.setProgressCallback(callback);
return false;
});
}
void IBlockInputStream::setProcessListElement(QueryStatus * elem)
{
process_list_elem = elem;
forEachChild([&] (IBlockInputStream & child)
{
child.setProcessListElement(elem);
return false;
});
}
Block IBlockInputStream::getTotals()
{
if (totals)
return totals;
Block res;
forEachChild([&] (IBlockInputStream & child)
{
res = child.getTotals();
if (res)
return true;
return false;
});
return res;
}
Block IBlockInputStream::getExtremes()
{
if (extremes)
return extremes;
Block res;
forEachChild([&] (IBlockInputStream & child)
{
res = child.getExtremes();
if (res)
return true;
return false;
});
return res;
}
String IBlockInputStream::getTreeID() const
@ -40,11 +447,6 @@ String IBlockInputStream::getTreeID() const
}
size_t IBlockInputStream::checkDepth(size_t max_depth) const
{
return checkDepthImpl(max_depth, max_depth);
}
size_t IBlockInputStream::checkDepthImpl(size_t max_depth, size_t level) const
{
if (children.empty())
@ -94,4 +496,3 @@ void IBlockInputStream::dumpTree(std::ostream & ostr, size_t indent, size_t mult
}
}

View File

@ -1,41 +1,38 @@
#pragma once
#include <vector>
#include <memory>
#include <mutex>
#include <shared_mutex>
#include <functional>
#include <boost/noncopyable.hpp>
#include <Core/Block.h>
#include <Core/SortDescription.h>
#include <DataStreams/BlockStreamProfileInfo.h>
#include <DataStreams/SizeLimits.h>
#include <IO/Progress.h>
#include <Interpreters/SettingsCommon.h>
#include <atomic>
#include <shared_mutex>
namespace DB
{
class IBlockInputStream;
using BlockInputStreamPtr = std::shared_ptr<IBlockInputStream>;
using BlockInputStreams = std::vector<BlockInputStreamPtr>;
class TableStructureReadLock;
using TableStructureReadLockPtr = std::shared_ptr<TableStructureReadLock>;
using TableStructureReadLocks = std::vector<TableStructureReadLockPtr>;
using TableStructureReadLocksList = std::list<TableStructureReadLockPtr>;
struct Progress;
namespace ErrorCodes
{
extern const int OUTPUT_IS_NOT_SORTED;
extern const int NOT_IMPLEMENTED;
extern const int QUERY_WAS_CANCELLED;
}
class IBlockInputStream;
class ProcessListElement;
class QuotaForIntervals;
class QueryStatus;
class TableStructureReadLock;
using BlockInputStreamPtr = std::shared_ptr<IBlockInputStream>;
using BlockInputStreams = std::vector<BlockInputStreamPtr>;
using TableStructureReadLockPtr = std::shared_ptr<TableStructureReadLock>;
using TableStructureReadLocks = std::vector<TableStructureReadLockPtr>;
/** Callback to track the progress of the query.
* Used in IProfilingBlockInputStream and Context.
* Used in IBlockInputStream and Context.
* The function takes the number of rows in the last block, the number of bytes in the last block.
* Note that the callback can be called from different threads.
*/
@ -44,11 +41,23 @@ using ProgressCallback = std::function<void(const Progress & progress)>;
/** The stream interface for reading data by blocks from the database.
* Relational operations are supposed to be done also as implementations of this interface.
* Watches out at how the source of the blocks works.
* Lets you get information for profiling: rows per second, blocks per second, megabytes per second, etc.
* Allows you to stop reading data (in nested sources).
*/
class IBlockInputStream : private boost::noncopyable
class IBlockInputStream
{
friend struct BlockStreamProfileInfo;
public:
IBlockInputStream() {}
IBlockInputStream() { info.parent = this; }
virtual ~IBlockInputStream() {}
IBlockInputStream(const IBlockInputStream &) = delete;
IBlockInputStream & operator=(const IBlockInputStream &) = delete;
/// To output the data stream transformation tree (query execution plan).
virtual String getName() const = 0;
/** Get data structure of the stream in a form of "header" block (it is also called "sample block").
* Header block contains column names, data types, columns of size 0. Constant columns must have corresponding values.
@ -56,52 +65,244 @@ public:
*/
virtual Block getHeader() const = 0;
/** Read next block.
* If there are no more blocks, return an empty block (for which operator `bool` returns false).
* NOTE: Only one thread can read from one instance of IBlockInputStream simultaneously.
* This also applies for readPrefix, readSuffix.
*/
virtual Block read() = 0;
virtual const BlockMissingValues & getMissingValues() const
{
static const BlockMissingValues none;
return none;
}
/// If this stream generates data in order by some keys, return true.
virtual bool isSortedOutput() const { return false; }
/// In case of isSortedOutput, return corresponding SortDescription
virtual const SortDescription & getSortDescription() const
{
throw Exception("Output of " + getName() + " is not sorted", ErrorCodes::OUTPUT_IS_NOT_SORTED);
}
/** Read next block.
* If there are no more blocks, return an empty block (for which operator `bool` returns false).
* NOTE: Only one thread can read from one instance of IBlockInputStream simultaneously.
* This also applies for readPrefix, readSuffix.
*/
Block read();
/** Read something before starting all data or after the end of all data.
* In the `readSuffix` function, you can implement a finalization that can lead to an exception.
* readPrefix() must be called before the first call to read().
* readSuffix() should be called after read() returns an empty block, or after a call to cancel(), but not during read() execution.
*/
virtual void readPrefix() {}
virtual void readSuffix() {}
virtual ~IBlockInputStream() {}
/** To output the data stream transformation tree (query execution plan).
/** The default implementation calls readPrefixImpl() on itself, and then readPrefix() recursively for all children.
* There are cases when you do not want `readPrefix` of children to be called synchronously, in this function,
* but you want them to be called, for example, in separate threads (for parallel initialization of children).
* Then overload `readPrefix` function.
*/
virtual String getName() const = 0;
virtual void readPrefix();
/// If this stream generates data in order by some keys, return true.
virtual bool isSortedOutput() const { return false; }
/// In case of isSortedOutput, return corresponding SortDescription
virtual const SortDescription & getSortDescription() const { throw Exception("Output of " + getName() + " is not sorted", ErrorCodes::OUTPUT_IS_NOT_SORTED); }
/** Must be called before read, readPrefix.
/** The default implementation calls recursively readSuffix() on all children, and then readSuffixImpl() on itself.
* If this stream calls read() in children in a separate thread, this behavior is usually incorrect:
* readSuffix() of the child can not be called at the moment when the same child's read() is executed in another thread.
* In this case, you need to override this method so that readSuffix() in children is called, for example, after connecting streams.
*/
virtual void readSuffix();
/// Must be called before `read()` and `readPrefix()`.
void dumpTree(std::ostream & ostr, size_t indent = 0, size_t multiplier = 1) const;
/** Check the depth of the pipeline.
* If max_depth is specified and the `depth` is greater - throw an exception.
* Must be called before read, readPrefix.
* Must be called before `read()` and `readPrefix()`.
*/
size_t checkDepth(size_t max_depth) const;
size_t checkDepth(size_t max_depth) const { return checkDepthImpl(max_depth, max_depth); }
/** Do not allow to change the table while the blocks stream is alive.
*/
/// Do not allow to change the table while the blocks stream is alive.
void addTableLock(const TableStructureReadLockPtr & lock) { table_locks.push_back(lock); }
/// Get information about execution speed.
const BlockStreamProfileInfo & getProfileInfo() const { return info; }
/** Get "total" values.
* The default implementation takes them from itself or from the first child source in which they are.
* The overridden method can perform some calculations. For example, apply an expression to the `totals` of the child source.
* There can be no total values - then an empty block is returned.
*
* Call this method only after all the data has been retrieved with `read`,
* otherwise there will be problems if any data at the same time is computed in another thread.
*/
virtual Block getTotals();
/// The same for minimums and maximums.
Block getExtremes();
/** Set the execution progress bar callback.
* The callback is passed to all child sources.
* By default, it is called for leaf sources, after each block.
* (But this can be overridden in the progress() method)
* The function takes the number of rows in the last block, the number of bytes in the last block.
* Note that the callback can be called from different threads.
*/
void setProgressCallback(const ProgressCallback & callback);
/** In this method:
* - the progress callback is called;
* - the status of the query execution in ProcessList is updated;
* - checks restrictions and quotas that should be checked not within the same source,
* but over the total amount of resources spent in all sources at once (information in the ProcessList).
*/
virtual void progress(const Progress & value)
{
/// The data for progress is taken from leaf sources.
if (children.empty())
progressImpl(value);
}
void progressImpl(const Progress & value);
/** Set the pointer to the process list item.
* It is passed to all child sources.
* General information about the resources spent on the request will be written into it.
* Based on this information, the quota and some restrictions will be checked.
* This information will also be available in the SHOW PROCESSLIST request.
*/
void setProcessListElement(QueryStatus * elem);
/** Set the approximate total number of rows to read.
*/
void addTotalRowsApprox(size_t value) { total_rows_approx += value; }
/** Ask to abort the receipt of data as soon as possible.
* By default - just sets the flag is_cancelled and asks that all children be interrupted.
* This function can be called several times, including simultaneously from different threads.
* Have two modes:
* with kill = false only is_cancelled is set - streams will stop silently with returning some processed data.
* with kill = true also is_killed set - queries will stop with exception.
*/
virtual void cancel(bool kill);
bool isCancelled() const;
bool isCancelledOrThrowIfKilled() const;
/** What limitations and quotas should be checked.
* LIMITS_CURRENT - checks amount of data read by current stream only (BlockStreamProfileInfo is used for check).
* Currently it is used in root streams to check max_result_{rows,bytes} limits.
* LIMITS_TOTAL - checks total amount of read data from leaf streams (i.e. data read from disk and remote servers).
* It is checks max_{rows,bytes}_to_read in progress handler and use info from ProcessListElement::progress_in for this.
* Currently this check is performed only in leaf streams.
*/
enum LimitsMode
{
LIMITS_CURRENT,
LIMITS_TOTAL,
};
/// It is a subset of limitations from Limits.
struct LocalLimits
{
LimitsMode mode = LIMITS_CURRENT;
SizeLimits size_limits;
Poco::Timespan max_execution_time = 0;
OverflowMode timeout_overflow_mode = OverflowMode::THROW;
/// in rows per second
size_t min_execution_speed = 0;
/// Verify that the speed is not too low after the specified time has elapsed.
Poco::Timespan timeout_before_checking_execution_speed = 0;
};
/** Set limitations that checked on each block. */
void setLimits(const LocalLimits & limits_)
{
limits = limits_;
}
const LocalLimits & getLimits() const
{
return limits;
}
/** Set the quota. If you set a quota on the amount of raw data,
* then you should also set mode = LIMITS_TOTAL to LocalLimits with setLimits.
*/
void setQuota(QuotaForIntervals & quota_)
{
quota = &quota_;
}
/// Enable calculation of minimums and maximums by the result columns.
void enableExtremes() { enabled_extremes = true; }
protected:
BlockInputStreams children;
std::shared_mutex children_mutex;
BlockStreamProfileInfo info;
std::atomic<bool> is_cancelled{false};
std::atomic<bool> is_killed{false};
ProgressCallback progress_callback;
QueryStatus * process_list_elem = nullptr;
/// According to total_stopwatch in microseconds
UInt64 last_profile_events_update_time = 0;
/// Additional information that can be generated during the work process.
/// Total values during aggregation.
Block totals;
/// Minimums and maximums. The first row of the block - minimums, the second - the maximums.
Block extremes;
void addChild(BlockInputStreamPtr & child)
{
std::unique_lock lock(children_mutex);
children.push_back(child);
}
private:
TableStructureReadLocks table_locks;
bool enabled_extremes = false;
/// The limit on the number of rows/bytes has been exceeded, and you need to stop execution on the next `read` call, as if the thread has run out.
bool limit_exceeded_need_break = false;
/// Limitations and quotas.
LocalLimits limits;
QuotaForIntervals * quota = nullptr; /// If nullptr - the quota is not used.
double prev_elapsed = 0;
/// The approximate total number of rows to read. For progress bar.
size_t total_rows_approx = 0;
/// The successors must implement this function.
virtual Block readImpl() = 0;
/// Here you can do a preliminary initialization.
virtual void readPrefixImpl() {}
/// Here you need to do a finalization, which can lead to an exception.
virtual void readSuffixImpl() {}
void updateExtremes(Block & block);
/** Check limits and quotas.
* But only those that can be checked within each separate stream.
*/
bool checkTimeLimit();
void checkQuota(Block & block);
size_t checkDepthImpl(size_t max_depth, size_t level) const;
/// Get text with names of this source and the entire subtree.
String getTreeID() const;
template <typename F>
void forEachChild(F && f)
@ -113,20 +314,6 @@ public:
if (f(*child))
return;
}
protected:
BlockInputStreams children;
std::shared_mutex children_mutex;
private:
TableStructureReadLocks table_locks;
size_t checkDepthImpl(size_t max_depth, size_t level) const;
/// Get text with names of this source and the entire subtree.
String getTreeID() const;
};
}

View File

@ -1,427 +0,0 @@
#include <Interpreters/Quota.h>
#include <Interpreters/ProcessList.h>
#include <DataStreams/IProfilingBlockInputStream.h>
#include <Common/CurrentThread.h>
namespace ProfileEvents
{
extern const Event ThrottlerSleepMicroseconds;
}
namespace DB
{
namespace ErrorCodes
{
extern const int TOO_MANY_ROWS;
extern const int TOO_MANY_BYTES;
extern const int TOO_MANY_ROWS_OR_BYTES;
extern const int TIMEOUT_EXCEEDED;
extern const int TOO_SLOW;
extern const int LOGICAL_ERROR;
extern const int BLOCKS_HAVE_DIFFERENT_STRUCTURE;
}
IProfilingBlockInputStream::IProfilingBlockInputStream()
{
info.parent = this;
}
Block IProfilingBlockInputStream::read()
{
if (total_rows_approx)
{
progressImpl(Progress(0, 0, total_rows_approx));
total_rows_approx = 0;
}
if (!info.started)
{
info.total_stopwatch.start();
info.started = true;
}
Block res;
if (isCancelledOrThrowIfKilled())
return res;
if (!checkTimeLimit())
limit_exceeded_need_break = true;
if (!limit_exceeded_need_break)
res = readImpl();
if (res)
{
info.update(res);
if (enabled_extremes)
updateExtremes(res);
if (limits.mode == LIMITS_CURRENT && !limits.size_limits.check(info.rows, info.bytes, "result", ErrorCodes::TOO_MANY_ROWS_OR_BYTES))
limit_exceeded_need_break = true;
if (quota != nullptr)
checkQuota(res);
}
else
{
/** If the thread is over, then we will ask all children to abort the execution.
* This makes sense when running a query with LIMIT
* - there is a situation when all the necessary data has already been read,
* but children sources are still working,
* herewith they can work in separate threads or even remotely.
*/
cancel(false);
}
progress(Progress(res.rows(), res.bytes()));
#ifndef NDEBUG
if (res)
{
Block header = getHeader();
if (header)
assertBlocksHaveEqualStructure(res, header, getName());
}
#endif
return res;
}
void IProfilingBlockInputStream::readPrefix()
{
readPrefixImpl();
forEachChild([&] (IBlockInputStream & child)
{
child.readPrefix();
return false;
});
}
void IProfilingBlockInputStream::readSuffix()
{
forEachChild([&] (IBlockInputStream & child)
{
child.readSuffix();
return false;
});
readSuffixImpl();
}
void IProfilingBlockInputStream::updateExtremes(Block & block)
{
size_t num_columns = block.columns();
if (!extremes)
{
MutableColumns extremes_columns(num_columns);
for (size_t i = 0; i < num_columns; ++i)
{
const ColumnPtr & src = block.safeGetByPosition(i).column;
if (src->isColumnConst())
{
/// Equal min and max.
extremes_columns[i] = src->cloneResized(2);
}
else
{
Field min_value;
Field max_value;
src->getExtremes(min_value, max_value);
extremes_columns[i] = src->cloneEmpty();
extremes_columns[i]->insert(min_value);
extremes_columns[i]->insert(max_value);
}
}
extremes = block.cloneWithColumns(std::move(extremes_columns));
}
else
{
for (size_t i = 0; i < num_columns; ++i)
{
ColumnPtr & old_extremes = extremes.safeGetByPosition(i).column;
if (old_extremes->isColumnConst())
continue;
Field min_value = (*old_extremes)[0];
Field max_value = (*old_extremes)[1];
Field cur_min_value;
Field cur_max_value;
block.safeGetByPosition(i).column->getExtremes(cur_min_value, cur_max_value);
if (cur_min_value < min_value)
min_value = cur_min_value;
if (cur_max_value > max_value)
max_value = cur_max_value;
MutableColumnPtr new_extremes = old_extremes->cloneEmpty();
new_extremes->insert(min_value);
new_extremes->insert(max_value);
old_extremes = std::move(new_extremes);
}
}
}
static bool handleOverflowMode(OverflowMode mode, const String & message, int code)
{
switch (mode)
{
case OverflowMode::THROW:
throw Exception(message, code);
case OverflowMode::BREAK:
return false;
default:
throw Exception("Logical error: unknown overflow mode", ErrorCodes::LOGICAL_ERROR);
}
}
bool IProfilingBlockInputStream::checkTimeLimit()
{
if (limits.max_execution_time != 0
&& info.total_stopwatch.elapsed() > static_cast<UInt64>(limits.max_execution_time.totalMicroseconds()) * 1000)
return handleOverflowMode(limits.timeout_overflow_mode,
"Timeout exceeded: elapsed " + toString(info.total_stopwatch.elapsedSeconds())
+ " seconds, maximum: " + toString(limits.max_execution_time.totalMicroseconds() / 1000000.0),
ErrorCodes::TIMEOUT_EXCEEDED);
return true;
}
void IProfilingBlockInputStream::checkQuota(Block & block)
{
switch (limits.mode)
{
case LIMITS_TOTAL:
/// Checked in `progress` method.
break;
case LIMITS_CURRENT:
{
time_t current_time = time(nullptr);
double total_elapsed = info.total_stopwatch.elapsedSeconds();
quota->checkAndAddResultRowsBytes(current_time, block.rows(), block.bytes());
quota->checkAndAddExecutionTime(current_time, Poco::Timespan((total_elapsed - prev_elapsed) * 1000000.0));
prev_elapsed = total_elapsed;
break;
}
}
}
void IProfilingBlockInputStream::progressImpl(const Progress & value)
{
if (progress_callback)
progress_callback(value);
if (process_list_elem)
{
if (!process_list_elem->updateProgressIn(value))
cancel(/* kill */ true);
/// The total amount of data processed or intended for processing in all leaf sources, possibly on remote servers.
ProgressValues progress = process_list_elem->getProgressIn();
size_t total_rows_estimate = std::max(progress.rows, progress.total_rows);
/** Check the restrictions on the amount of data to read, the speed of the query, the quota on the amount of data to read.
* NOTE: Maybe it makes sense to have them checked directly in ProcessList?
*/
if (limits.mode == LIMITS_TOTAL
&& ((limits.size_limits.max_rows && total_rows_estimate > limits.size_limits.max_rows)
|| (limits.size_limits.max_bytes && progress.bytes > limits.size_limits.max_bytes)))
{
switch (limits.size_limits.overflow_mode)
{
case OverflowMode::THROW:
{
if (limits.size_limits.max_rows && total_rows_estimate > limits.size_limits.max_rows)
throw Exception("Limit for rows to read exceeded: " + toString(total_rows_estimate)
+ " rows read (or to read), maximum: " + toString(limits.size_limits.max_rows),
ErrorCodes::TOO_MANY_ROWS);
else
throw Exception("Limit for (uncompressed) bytes to read exceeded: " + toString(progress.bytes)
+ " bytes read, maximum: " + toString(limits.size_limits.max_bytes),
ErrorCodes::TOO_MANY_BYTES);
}
case OverflowMode::BREAK:
{
/// For `break`, we will stop only if so many rows were actually read, and not just supposed to be read.
if ((limits.size_limits.max_rows && progress.rows > limits.size_limits.max_rows)
|| (limits.size_limits.max_bytes && progress.bytes > limits.size_limits.max_bytes))
{
cancel(false);
}
break;
}
default:
throw Exception("Logical error: unknown overflow mode", ErrorCodes::LOGICAL_ERROR);
}
}
size_t total_rows = progress.total_rows;
constexpr UInt64 profile_events_update_period_microseconds = 10 * 1000; // 10 milliseconds
UInt64 total_elapsed_microseconds = info.total_stopwatch.elapsedMicroseconds();
if (last_profile_events_update_time + profile_events_update_period_microseconds < total_elapsed_microseconds)
{
CurrentThread::updatePerformanceCounters();
last_profile_events_update_time = total_elapsed_microseconds;
}
if ((limits.min_execution_speed || (total_rows && limits.timeout_before_checking_execution_speed != 0))
&& (static_cast<Int64>(total_elapsed_microseconds) > limits.timeout_before_checking_execution_speed.totalMicroseconds()))
{
/// Do not count sleeps in throttlers
UInt64 throttler_sleep_microseconds = CurrentThread::getProfileEvents()[ProfileEvents::ThrottlerSleepMicroseconds];
double elapsed_seconds = (throttler_sleep_microseconds > total_elapsed_microseconds)
? 0.0 : (total_elapsed_microseconds - throttler_sleep_microseconds) / 1000000.0;
if (elapsed_seconds > 0)
{
if (limits.min_execution_speed && progress.rows / elapsed_seconds < limits.min_execution_speed)
throw Exception("Query is executing too slow: " + toString(progress.rows / elapsed_seconds)
+ " rows/sec., minimum: " + toString(limits.min_execution_speed),
ErrorCodes::TOO_SLOW);
/// If the predicted execution time is longer than `max_execution_time`.
if (limits.max_execution_time != 0 && total_rows)
{
double estimated_execution_time_seconds = elapsed_seconds * (static_cast<double>(total_rows) / progress.rows);
if (estimated_execution_time_seconds > limits.max_execution_time.totalSeconds())
throw Exception("Estimated query execution time (" + toString(estimated_execution_time_seconds) + " seconds)"
+ " is too long. Maximum: " + toString(limits.max_execution_time.totalSeconds())
+ ". Estimated rows to process: " + toString(total_rows),
ErrorCodes::TOO_SLOW);
}
}
}
if (quota != nullptr && limits.mode == LIMITS_TOTAL)
{
quota->checkAndAddReadRowsBytes(time(nullptr), value.rows, value.bytes);
}
}
}
void IProfilingBlockInputStream::cancel(bool kill)
{
if (kill)
is_killed = true;
bool old_val = false;
if (!is_cancelled.compare_exchange_strong(old_val, true, std::memory_order_seq_cst, std::memory_order_relaxed))
return;
forEachProfilingChild([&] (IProfilingBlockInputStream & child)
{
child.cancel(kill);
return false;
});
}
bool IProfilingBlockInputStream::isCancelled() const
{
return is_cancelled;
}
bool IProfilingBlockInputStream::isCancelledOrThrowIfKilled() const
{
if (!is_cancelled)
return false;
if (is_killed)
throw Exception("Query was cancelled", ErrorCodes::QUERY_WAS_CANCELLED);
return true;
}
void IProfilingBlockInputStream::setProgressCallback(const ProgressCallback & callback)
{
progress_callback = callback;
forEachProfilingChild([&] (IProfilingBlockInputStream & child)
{
child.setProgressCallback(callback);
return false;
});
}
void IProfilingBlockInputStream::setProcessListElement(QueryStatus * elem)
{
process_list_elem = elem;
forEachProfilingChild([&] (IProfilingBlockInputStream & child)
{
child.setProcessListElement(elem);
return false;
});
}
Block IProfilingBlockInputStream::getTotals()
{
if (totals)
return totals;
Block res;
forEachProfilingChild([&] (IProfilingBlockInputStream & child)
{
res = child.getTotals();
if (res)
return true;
return false;
});
return res;
}
Block IProfilingBlockInputStream::getExtremes()
{
if (extremes)
return extremes;
Block res;
forEachProfilingChild([&] (IProfilingBlockInputStream & child)
{
res = child.getExtremes();
if (res)
return true;
return false;
});
return res;
}
}

View File

@ -1,247 +0,0 @@
#pragma once
#include <IO/Progress.h>
#include <DataStreams/BlockStreamProfileInfo.h>
#include <DataStreams/IBlockInputStream.h>
#include <DataStreams/SizeLimits.h>
#include <Interpreters/SettingsCommon.h>
#include <atomic>
namespace DB
{
namespace ErrorCodes
{
extern const int QUERY_WAS_CANCELLED;
}
class QuotaForIntervals;
class QueryStatus;
class ProcessListElement;
class IProfilingBlockInputStream;
using ProfilingBlockInputStreamPtr = std::shared_ptr<IProfilingBlockInputStream>;
/** Watches out at how the source of the blocks works.
* Lets you get information for profiling:
* rows per second, blocks per second, megabytes per second, etc.
* Allows you to stop reading data (in nested sources).
*/
class IProfilingBlockInputStream : public IBlockInputStream
{
friend struct BlockStreamProfileInfo;
public:
IProfilingBlockInputStream();
Block read() override final;
/** The default implementation calls readPrefixImpl() on itself, and then readPrefix() recursively for all children.
* There are cases when you do not want `readPrefix` of children to be called synchronously, in this function,
* but you want them to be called, for example, in separate threads (for parallel initialization of children).
* Then overload `readPrefix` function.
*/
void readPrefix() override;
/** The default implementation calls recursively readSuffix() on all children, and then readSuffixImpl() on itself.
* If this stream calls read() in children in a separate thread, this behavior is usually incorrect:
* readSuffix() of the child can not be called at the moment when the same child's read() is executed in another thread.
* In this case, you need to override this method so that readSuffix() in children is called, for example, after connecting streams.
*/
void readSuffix() override;
/// Get information about execution speed.
const BlockStreamProfileInfo & getProfileInfo() const { return info; }
/** Get "total" values.
* The default implementation takes them from itself or from the first child source in which they are.
* The overridden method can perform some calculations. For example, apply an expression to the `totals` of the child source.
* There can be no total values - then an empty block is returned.
*
* Call this method only after all the data has been retrieved with `read`,
* otherwise there will be problems if any data at the same time is computed in another thread.
*/
virtual Block getTotals();
/// The same for minimums and maximums.
Block getExtremes();
/** Set the execution progress bar callback.
* The callback is passed to all child sources.
* By default, it is called for leaf sources, after each block.
* (But this can be overridden in the progress() method)
* The function takes the number of rows in the last block, the number of bytes in the last block.
* Note that the callback can be called from different threads.
*/
void setProgressCallback(const ProgressCallback & callback);
/** In this method:
* - the progress callback is called;
* - the status of the query execution in ProcessList is updated;
* - checks restrictions and quotas that should be checked not within the same source,
* but over the total amount of resources spent in all sources at once (information in the ProcessList).
*/
virtual void progress(const Progress & value)
{
/// The data for progress is taken from leaf sources.
if (children.empty())
progressImpl(value);
}
void progressImpl(const Progress & value);
/** Set the pointer to the process list item.
* It is passed to all child sources.
* General information about the resources spent on the request will be written into it.
* Based on this information, the quota and some restrictions will be checked.
* This information will also be available in the SHOW PROCESSLIST request.
*/
void setProcessListElement(QueryStatus * elem);
/** Set the approximate total number of rows to read.
*/
void addTotalRowsApprox(size_t value) { total_rows_approx += value; }
/** Ask to abort the receipt of data as soon as possible.
* By default - just sets the flag is_cancelled and asks that all children be interrupted.
* This function can be called several times, including simultaneously from different threads.
* Have two modes:
* with kill = false only is_cancelled is set - streams will stop silently with returning some processed data.
* with kill = true also is_killed set - queries will stop with exception.
*/
virtual void cancel(bool kill);
bool isCancelled() const;
bool isCancelledOrThrowIfKilled() const;
/** What limitations and quotas should be checked.
* LIMITS_CURRENT - checks amount of data read by current stream only (BlockStreamProfileInfo is used for check).
* Currently it is used in root streams to check max_result_{rows,bytes} limits.
* LIMITS_TOTAL - checks total amount of read data from leaf streams (i.e. data read from disk and remote servers).
* It is checks max_{rows,bytes}_to_read in progress handler and use info from ProcessListElement::progress_in for this.
* Currently this check is performed only in leaf streams.
*/
enum LimitsMode
{
LIMITS_CURRENT,
LIMITS_TOTAL,
};
/// It is a subset of limitations from Limits.
struct LocalLimits
{
LimitsMode mode = LIMITS_CURRENT;
SizeLimits size_limits;
Poco::Timespan max_execution_time = 0;
OverflowMode timeout_overflow_mode = OverflowMode::THROW;
/// in rows per second
size_t min_execution_speed = 0;
/// Verify that the speed is not too low after the specified time has elapsed.
Poco::Timespan timeout_before_checking_execution_speed = 0;
};
/** Set limitations that checked on each block. */
void setLimits(const LocalLimits & limits_)
{
limits = limits_;
}
const LocalLimits & getLimits() const
{
return limits;
}
/** Set the quota. If you set a quota on the amount of raw data,
* then you should also set mode = LIMITS_TOTAL to LocalLimits with setLimits.
*/
void setQuota(QuotaForIntervals & quota_)
{
quota = &quota_;
}
/// Enable calculation of minimums and maximums by the result columns.
void enableExtremes() { enabled_extremes = true; }
protected:
BlockStreamProfileInfo info;
std::atomic<bool> is_cancelled{false};
std::atomic<bool> is_killed{false};
ProgressCallback progress_callback;
QueryStatus * process_list_elem = nullptr;
/// According to total_stopwatch in microseconds
UInt64 last_profile_events_update_time = 0;
/// Additional information that can be generated during the work process.
/// Total values during aggregation.
Block totals;
/// Minimums and maximums. The first row of the block - minimums, the second - the maximums.
Block extremes;
void addChild(BlockInputStreamPtr & child)
{
std::unique_lock lock(children_mutex);
children.push_back(child);
}
private:
bool enabled_extremes = false;
/// The limit on the number of rows/bytes has been exceeded, and you need to stop execution on the next `read` call, as if the thread has run out.
bool limit_exceeded_need_break = false;
/// Limitations and quotas.
LocalLimits limits;
QuotaForIntervals * quota = nullptr; /// If nullptr - the quota is not used.
double prev_elapsed = 0;
/// The approximate total number of rows to read. For progress bar.
size_t total_rows_approx = 0;
/// The successors must implement this function.
virtual Block readImpl() = 0;
/// Here you can do a preliminary initialization.
virtual void readPrefixImpl() {}
/// Here you need to do a finalization, which can lead to an exception.
virtual void readSuffixImpl() {}
void updateExtremes(Block & block);
/** Check limits and quotas.
* But only those that can be checked within each separate stream.
*/
bool checkTimeLimit();
void checkQuota(Block & block);
template <typename F>
void forEachProfilingChild(F && f)
{
/// NOTE: Acquire a read lock, therefore f() should be thread safe
std::shared_lock lock(children_mutex);
for (auto & child : children)
if (IProfilingBlockInputStream * p_child = dynamic_cast<IProfilingBlockInputStream *>(child.get()))
if (f(*p_child))
return;
}
};
}

View File

@ -1,7 +1,7 @@
#pragma once
#include <Parsers/IAST.h>
#include <DataStreams/IProfilingBlockInputStream.h>
#include <DataStreams/IBlockInputStream.h>
#include <cstddef>
#include <memory>
@ -16,7 +16,7 @@ class Context;
* Head of inserting data could be stored in INSERT ast directly
* Remaining (tail) data could be stored in input_buffer_tail_part
*/
class InputStreamFromASTInsertQuery : public IProfilingBlockInputStream
class InputStreamFromASTInsertQuery : public IBlockInputStream
{
public:
InputStreamFromASTInsertQuery(const ASTPtr & ast, ReadBuffer & input_buffer_tail_part, const BlockIO & streams, Context & context);

View File

@ -1,6 +1,6 @@
#pragma once
#include <DataStreams/IProfilingBlockInputStream.h>
#include <DataStreams/IBlockInputStream.h>
namespace DB
@ -10,7 +10,7 @@ namespace DB
* This is needed, for example, to read from a table that will be populated
* after creation of LazyBlockInputStream object, but before the first `read` call.
*/
class LazyBlockInputStream : public IProfilingBlockInputStream
class LazyBlockInputStream : public IBlockInputStream
{
public:
using Generator = std::function<BlockInputStreamPtr()>;
@ -42,7 +42,7 @@ protected:
if (!input)
return Block();
auto * p_input = dynamic_cast<IProfilingBlockInputStream *>(input.get());
auto * p_input = dynamic_cast<IBlockInputStream *>(input.get());
if (p_input)
{

View File

@ -58,8 +58,10 @@ Block LimitBlockInputStream::readImpl()
for (size_t i = 0; i < res.columns(); ++i)
res.safeGetByPosition(i).column = res.safeGetByPosition(i).column->cut(start, length);
// TODO: we should provide feedback to child-block, so it will know how many rows are actually consumed.
// It's crucial for streaming engines like Kafka.
return res;
}
}

View File

@ -1,6 +1,6 @@
#pragma once
#include <DataStreams/IProfilingBlockInputStream.h>
#include <DataStreams/IBlockInputStream.h>
namespace DB
@ -9,7 +9,7 @@ namespace DB
/** Implements the LIMIT relational operation.
*/
class LimitBlockInputStream : public IProfilingBlockInputStream
class LimitBlockInputStream : public IBlockInputStream
{
public:
/** If always_read_till_end = false (by default), then after reading enough data,

View File

@ -1,6 +1,6 @@
#pragma once
#include <DataStreams/IProfilingBlockInputStream.h>
#include <DataStreams/IBlockInputStream.h>
#include <Common/HashTable/HashMap.h>
#include <Common/UInt128.h>
@ -15,7 +15,7 @@ namespace DB
* the query SELECT Num FROM T LIMIT 2 BY Num
* will give you the following result: (Num: 1 1 3 3 4 4 5 7 7).
*/
class LimitByBlockInputStream : public IProfilingBlockInputStream
class LimitByBlockInputStream : public IBlockInputStream
{
public:
LimitByBlockInputStream(const BlockInputStreamPtr & input, size_t group_size_, const Names & columns);

View File

@ -1,13 +1,13 @@
#pragma once
#include <DataStreams/IProfilingBlockInputStream.h>
#include <DataStreams/IBlockInputStream.h>
namespace DB
{
/** Converts columns-constants to full columns ("materializes" them).
*/
class MaterializingBlockInputStream : public IProfilingBlockInputStream
class MaterializingBlockInputStream : public IBlockInputStream
{
public:
MaterializingBlockInputStream(const BlockInputStreamPtr & input);

View File

@ -8,7 +8,7 @@
#include <Core/SortDescription.h>
#include <Core/SortCursor.h>
#include <DataStreams/IProfilingBlockInputStream.h>
#include <DataStreams/IBlockInputStream.h>
#include <DataStreams/NativeBlockInputStream.h>
#include <IO/ReadBufferFromFile.h>
@ -25,7 +25,7 @@ namespace DB
/** Part of implementation. Merging array of ready (already read from somewhere) blocks.
* Returns result of merge as stream of blocks, not more than 'max_merged_block_size' rows in each.
*/
class MergeSortingBlocksBlockInputStream : public IProfilingBlockInputStream
class MergeSortingBlocksBlockInputStream : public IBlockInputStream
{
public:
/// limit - if not 0, allowed to return just first 'limit' rows in sorted order.
@ -66,7 +66,7 @@ private:
};
class MergeSortingBlockInputStream : public IProfilingBlockInputStream
class MergeSortingBlockInputStream : public IBlockInputStream
{
public:
/// limit - if not 0, allowed to return just first 'limit' rows in sorted order.

View File

@ -1,7 +1,7 @@
#pragma once
#include <Interpreters/Aggregator.h>
#include <DataStreams/IProfilingBlockInputStream.h>
#include <DataStreams/IBlockInputStream.h>
namespace DB
@ -11,7 +11,7 @@ namespace DB
/** A pre-aggregate stream of blocks in which each block is already aggregated.
* Aggregate functions in blocks should not be finalized so that their states can be merged.
*/
class MergingAggregatedBlockInputStream : public IProfilingBlockInputStream
class MergingAggregatedBlockInputStream : public IBlockInputStream
{
public:
MergingAggregatedBlockInputStream(const BlockInputStreamPtr & input, const Aggregator::Params & params, bool final_, size_t max_threads_)

View File

@ -135,12 +135,10 @@ void MergingAggregatedMemoryEfficientBlockInputStream::cancel(bool kill)
}
for (auto & input : inputs)
{
if (IProfilingBlockInputStream * child = dynamic_cast<IProfilingBlockInputStream *>(input.stream.get()))
{
try
{
child->cancel(kill);
input.stream->cancel(kill);
}
catch (...)
{
@ -148,8 +146,7 @@ void MergingAggregatedMemoryEfficientBlockInputStream::cancel(bool kill)
* (example: connection reset during distributed query execution)
* - then don't care.
*/
LOG_ERROR(log, "Exception while cancelling " << child->getName());
}
LOG_ERROR(log, "Exception while cancelling " << input.stream->getName());
}
}
}

View File

@ -1,7 +1,7 @@
#pragma once
#include <Interpreters/Aggregator.h>
#include <DataStreams/IProfilingBlockInputStream.h>
#include <DataStreams/IBlockInputStream.h>
#include <Common/ConcurrentBoundedQueue.h>
#include <Common/CurrentThread.h>
#include <common/ThreadPool.h>
@ -57,7 +57,7 @@ namespace DB
* data from sources can also be read in several threads (reading_threads)
* for optimal performance in the presence of a fast network or disks (from where these blocks are read).
*/
class MergingAggregatedMemoryEfficientBlockInputStream final : public IProfilingBlockInputStream
class MergingAggregatedMemoryEfficientBlockInputStream final : public IBlockInputStream
{
public:
MergingAggregatedMemoryEfficientBlockInputStream(

View File

@ -12,7 +12,7 @@
#include <IO/WriteHelpers.h>
#include <DataStreams/IProfilingBlockInputStream.h>
#include <DataStreams/IBlockInputStream.h>
#include <DataStreams/ColumnGathererStream.h>
@ -60,7 +60,7 @@ inline void intrusive_ptr_release(detail::SharedBlock * ptr)
/** Merges several sorted streams into one sorted stream.
*/
class MergingSortedBlockInputStream : public IProfilingBlockInputStream
class MergingSortedBlockInputStream : public IBlockInputStream
{
public:
/** limit - if isn't 0, then we can produce only first limit rows in sorted order.

View File

@ -1,6 +1,6 @@
#pragma once
#include <DataStreams/IProfilingBlockInputStream.h>
#include <DataStreams/IBlockInputStream.h>
#include <DataStreams/MarkInCompressedFile.h>
#include <Common/PODArray.h>
@ -57,7 +57,7 @@ struct IndexForNativeFormat
* Can also be used to store data on disk.
* In this case, can use the index.
*/
class NativeBlockInputStream : public IProfilingBlockInputStream
class NativeBlockInputStream : public IBlockInputStream
{
public:
/// If a non-zero server_revision is specified, additional block information may be expected and read.

View File

@ -1,6 +1,6 @@
#pragma once
#include <DataStreams/IProfilingBlockInputStream.h>
#include <DataStreams/IBlockInputStream.h>
#include <DataStreams/copyData.h>
@ -17,7 +17,7 @@ using BlockOutputStreamPtr = std::shared_ptr<IBlockOutputStream>;
* The query could be executed without wrapping it in an empty BlockInputStream,
* but the progress of query execution and the ability to cancel the query would not work.
*/
class NullAndDoCopyBlockInputStream : public IProfilingBlockInputStream
class NullAndDoCopyBlockInputStream : public IBlockInputStream
{
public:
NullAndDoCopyBlockInputStream(const BlockInputStreamPtr & input_, BlockOutputStreamPtr output_)

View File

@ -6,19 +6,19 @@
namespace DB
{
/** Empty stream of blocks of specified structure.
*/
/// Empty stream of blocks of specified structure.
class NullBlockInputStream : public IBlockInputStream
{
public:
NullBlockInputStream(const Block & header) : header(header) {}
NullBlockInputStream(const Block & header_) : header(header_) {}
Block read() override { return {}; }
Block getHeader() const override { return header; }
String getName() const override { return "Null"; }
private:
Block header;
Block readImpl() override { return {}; }
};
}
} /// namespace DB

View File

@ -1,6 +1,6 @@
#pragma once
#include <DataStreams/IProfilingBlockInputStream.h>
#include <DataStreams/IBlockInputStream.h>
namespace DB
@ -9,7 +9,7 @@ namespace DB
/** A stream of blocks from which you can read one block.
* Also see BlocksListBlockInputStream.
*/
class OneBlockInputStream : public IProfilingBlockInputStream
class OneBlockInputStream : public IBlockInputStream
{
public:
OneBlockInputStream(const Block & block_) : block(block_) {}

View File

@ -2,7 +2,7 @@
#include <memory>
#include <DataStreams/IProfilingBlockInputStream.h>
#include <DataStreams/IBlockInputStream.h>
namespace DB
{
@ -11,7 +11,7 @@ namespace DB
* simplifies usage of ReadBufferFromFile (no need to manage buffer lifetime) etc.
*/
template <typename OwnType>
class OwningBlockInputStream : public IProfilingBlockInputStream
class OwningBlockInputStream : public IBlockInputStream
{
public:
OwningBlockInputStream(const BlockInputStreamPtr & stream, std::unique_ptr<OwnType> own)

View File

@ -3,7 +3,7 @@
#include <Interpreters/Aggregator.h>
#include <IO/ReadBufferFromFile.h>
#include <Compression/CompressedReadBuffer.h>
#include <DataStreams/IProfilingBlockInputStream.h>
#include <DataStreams/IBlockInputStream.h>
#include <DataStreams/ParallelInputsProcessor.h>
@ -16,7 +16,7 @@ namespace DB
* If final == false, aggregate functions are not finalized, that is, they are not replaced by their value, but contain an intermediate state of calculations.
* This is necessary so that aggregation can continue (for example, by combining streams of partially aggregated data).
*/
class ParallelAggregatingBlockInputStream : public IProfilingBlockInputStream
class ParallelAggregatingBlockInputStream : public IBlockInputStream
{
public:
/** Columns from key_names and arguments of aggregate functions must already be computed.

View File

@ -8,7 +8,7 @@
#include <common/logger_useful.h>
#include <DataStreams/IProfilingBlockInputStream.h>
#include <DataStreams/IBlockInputStream.h>
#include <Common/setThreadName.h>
#include <Common/CurrentMetrics.h>
#include <Common/MemoryTracker.h>
@ -124,12 +124,10 @@ public:
finish = true;
for (auto & input : inputs)
{
if (IProfilingBlockInputStream * child = dynamic_cast<IProfilingBlockInputStream *>(&*input))
{
try
{
child->cancel(kill);
input->cancel(kill);
}
catch (...)
{
@ -137,8 +135,7 @@ public:
* (for example, the connection is broken for distributed query processing)
* - then do not care.
*/
LOG_ERROR(log, "Exception while cancelling " << child->getName());
}
LOG_ERROR(log, "Exception while cancelling " << input->getName());
}
}
}

View File

@ -2,7 +2,7 @@
#include <Core/SortDescription.h>
#include <DataStreams/IProfilingBlockInputStream.h>
#include <DataStreams/IBlockInputStream.h>
namespace DB
@ -11,7 +11,7 @@ namespace DB
/** Sorts each block individually by the values of the specified columns.
* At the moment, not very optimal algorithm is used.
*/
class PartialSortingBlockInputStream : public IProfilingBlockInputStream
class PartialSortingBlockInputStream : public IBlockInputStream
{
public:
/// limit - if not 0, then you can sort each block not completely, but only `limit` first rows by order.

View File

@ -109,8 +109,7 @@ void RemoteBlockInputStream::cancel(bool kill)
/// Stop sending external data.
for (auto & vec : external_tables_data)
for (auto & elem : vec)
if (IProfilingBlockInputStream * stream = dynamic_cast<IProfilingBlockInputStream *>(elem.first.get()))
stream->cancel(kill);
elem.first->cancel(kill);
}
if (!isQueryPending() || hasThrownException())

View File

@ -4,7 +4,7 @@
#include <common/logger_useful.h>
#include <DataStreams/IProfilingBlockInputStream.h>
#include <DataStreams/IBlockInputStream.h>
#include <Common/Throttler.h>
#include <Interpreters/Context.h>
#include <Client/ConnectionPool.h>
@ -17,7 +17,7 @@ namespace DB
/** This class allows one to launch queries on remote replicas of one shard and get results
*/
class RemoteBlockInputStream : public IProfilingBlockInputStream
class RemoteBlockInputStream : public IBlockInputStream
{
public:
/// Takes already set connection.

View File

@ -1,6 +1,6 @@
#pragma once
#include <DataStreams/IProfilingBlockInputStream.h>
#include <DataStreams/IBlockInputStream.h>
#include <Interpreters/Aggregator.h>
#include <Core/ColumnNumbers.h>
@ -14,7 +14,7 @@ class ExpressionActions;
/** Takes blocks after grouping, with non-finalized aggregate functions.
* Calculates subtotals and grand totals values for a set of columns.
*/
class RollupBlockInputStream : public IProfilingBlockInputStream
class RollupBlockInputStream : public IBlockInputStream
{
private:
using ExpressionActionsPtr = std::shared_ptr<ExpressionActions>;

View File

@ -1,6 +1,6 @@
#pragma once
#include <DataStreams/IProfilingBlockInputStream.h>
#include <DataStreams/IBlockInputStream.h>
#include <DataStreams/SquashingTransform.h>
@ -9,7 +9,7 @@ namespace DB
/** Merging consecutive blocks of stream to specified minimum size.
*/
class SquashingBlockInputStream : public IProfilingBlockInputStream
class SquashingBlockInputStream : public IBlockInputStream
{
public:
SquashingBlockInputStream(const BlockInputStreamPtr & src, size_t min_block_size_rows, size_t min_block_size_bytes);

View File

@ -181,7 +181,7 @@ SummingSortedBlockInputStream::SummingSortedBlockInputStream(
if (map_desc.key_col_nums.size() == 1)
{
// Create summation for all value columns in the map
desc.init("sumMap", argument_types);
desc.init("sumMapWithOverflow", argument_types);
columns_to_aggregate.emplace_back(std::move(desc));
}
else
@ -220,7 +220,7 @@ void SummingSortedBlockInputStream::insertCurrentRowIfNeeded(MutableColumns & me
}
else
{
/// It is sumMap aggregate function.
/// It is sumMapWithOverflow aggregate function.
/// Assume that the row isn't empty in this case (just because it is compatible with previous version)
current_row_is_zero = false;
}

View File

@ -70,7 +70,7 @@ private:
/// Stores aggregation function, state, and columns to be used as function arguments
struct AggregateDescription
{
/// An aggregate function 'sumWithOverflow' or 'sumMap' for summing.
/// An aggregate function 'sumWithOverflow' or 'sumMapWithOverflow' for summing.
AggregateFunctionPtr function;
IAggregateFunction::AddFunc add_function = nullptr;
std::vector<size_t> column_numbers;

View File

@ -1,6 +1,6 @@
#pragma once
#include <DataStreams/IProfilingBlockInputStream.h>
#include <DataStreams/IBlockInputStream.h>
namespace DB
@ -16,7 +16,7 @@ class ExpressionActions;
* Calculates total values according to totals_mode.
* If necessary, evaluates the expression from HAVING and filters rows. Returns the finalized and filtered blocks.
*/
class TotalsHavingBlockInputStream : public IProfilingBlockInputStream
class TotalsHavingBlockInputStream : public IBlockInputStream
{
private:
using ExpressionActionsPtr = std::shared_ptr<ExpressionActions>;
@ -52,7 +52,7 @@ private:
*/
Block overflow_aggregates;
/// Here, total values are accumulated. After the work is finished, they will be placed in IProfilingBlockInputStream::totals.
/// Here, total values are accumulated. After the work is finished, they will be placed in IBlockInputStream::totals.
MutableColumns current_totals;
/// Arena for aggregate function states in totals.
ArenaPtr arena;

View File

@ -3,7 +3,7 @@
#include <common/logger_useful.h>
#include <Common/ConcurrentBoundedQueue.h>
#include <DataStreams/IProfilingBlockInputStream.h>
#include <DataStreams/IBlockInputStream.h>
#include <DataStreams/ParallelInputsProcessor.h>
@ -26,7 +26,7 @@ namespace ErrorCodes
* - the completed blocks are added to a limited queue of finished blocks;
* - the main thread takes out completed blocks from the queue of finished blocks;
*/
class UnionBlockInputStream final : public IProfilingBlockInputStream
class UnionBlockInputStream final : public IBlockInputStream
{
private:
/// A block or an exception.

View File

@ -1,4 +1,4 @@
#include <DataStreams/IProfilingBlockInputStream.h>
#include <DataStreams/IBlockInputStream.h>
#include <DataStreams/IBlockOutputStream.h>
#include <DataStreams/copyData.h>
@ -35,14 +35,11 @@ void copyDataImpl(IBlockInputStream & from, IBlockOutputStream & to, TCancelCall
return;
/// For outputting additional information in some formats.
if (IProfilingBlockInputStream * input = dynamic_cast<IProfilingBlockInputStream *>(&from))
{
if (input->getProfileInfo().hasAppliedLimit())
to.setRowsBeforeLimit(input->getProfileInfo().getRowsBeforeLimit());
if (from.getProfileInfo().hasAppliedLimit())
to.setRowsBeforeLimit(from.getProfileInfo().getRowsBeforeLimit());
to.setTotals(input->getTotals());
to.setExtremes(input->getExtremes());
}
to.setTotals(from.getTotals());
to.setExtremes(from.getExtremes());
if (is_cancelled())
return;

View File

@ -9,6 +9,7 @@
#include <Common/AlignedBuffer.h>
#include <Formats/FormatSettings.h>
#include <Formats/ProtobufWriter.h>
#include <DataTypes/DataTypeAggregateFunction.h>
#include <DataTypes/DataTypeFactory.h>
@ -248,6 +249,12 @@ void DataTypeAggregateFunction::deserializeTextCSV(IColumn & column, ReadBuffer
}
void DataTypeAggregateFunction::serializeProtobuf(const IColumn & column, size_t row_num, ProtobufWriter & protobuf) const
{
protobuf.writeAggregateFunction(function, static_cast<const ColumnAggregateFunction &>(column).getData()[row_num]);
}
MutableColumnPtr DataTypeAggregateFunction::createColumn() const
{
return ColumnAggregateFunction::create(function);

View File

@ -56,6 +56,7 @@ public:
void serializeTextXML(const IColumn & column, size_t row_num, WriteBuffer & ostr, const FormatSettings &) const override;
void serializeTextCSV(const IColumn & column, size_t row_num, WriteBuffer & ostr, const FormatSettings &) const override;
void deserializeTextCSV(IColumn & column, ReadBuffer & istr, const FormatSettings & settings) const override;
void serializeProtobuf(const IColumn & column, size_t row_num, ProtobufWriter & protobuf) const override;
MutableColumnPtr createColumn() const override;

Some files were not shown because too many files have changed in this diff Show More