From 554d329064f7c2fbdd3099c472a874199ed62e6a Mon Sep 17 00:00:00 2001 From: Alexey Milovidov Date: Mon, 21 Apr 2014 20:09:04 +0400 Subject: [PATCH 01/14] dbms: fixed WITH TOTALS and LIMIT/DISTINCT [#METR-10705]. --- .../DB/Columns/ColumnAggregateFunction.h | 6 +-- .../TotalsHavingBlockInputStream.h | 8 +-- .../TotalsHavingBlockInputStream.cpp | 49 +++++++++++-------- 3 files changed, 31 insertions(+), 32 deletions(-) diff --git a/dbms/include/DB/Columns/ColumnAggregateFunction.h b/dbms/include/DB/Columns/ColumnAggregateFunction.h index 097e4510666..a92ff765449 100644 --- a/dbms/include/DB/Columns/ColumnAggregateFunction.h +++ b/dbms/include/DB/Columns/ColumnAggregateFunction.h @@ -39,10 +39,8 @@ public: func = func_; } - AggregateFunctionPtr getAggregateFunction() - { - return func; - } + AggregateFunctionPtr getAggregateFunction() { return func; } + AggregateFunctionPtr getAggregateFunction() const { return func; } /// Захватить владение ареной. void addArena(ArenaPtr arena_) diff --git a/dbms/include/DB/DataStreams/TotalsHavingBlockInputStream.h b/dbms/include/DB/DataStreams/TotalsHavingBlockInputStream.h index e4fca8d9614..1644ad44a29 100644 --- a/dbms/include/DB/DataStreams/TotalsHavingBlockInputStream.h +++ b/dbms/include/DB/DataStreams/TotalsHavingBlockInputStream.h @@ -38,13 +38,7 @@ public: return res.str(); } - const Block & getTotals() - { - if (totals && expression) - expression->execute(totals); - - return totals; - } + const Block & getTotals(); protected: Block readImpl(); diff --git a/dbms/src/DataStreams/TotalsHavingBlockInputStream.cpp b/dbms/src/DataStreams/TotalsHavingBlockInputStream.cpp index 680da3a2d41..b4adb71746c 100644 --- a/dbms/src/DataStreams/TotalsHavingBlockInputStream.cpp +++ b/dbms/src/DataStreams/TotalsHavingBlockInputStream.cpp @@ -19,6 +19,28 @@ static void finalize(Block & block) } } + +const Block & TotalsHavingBlockInputStream::getTotals() +{ + if (!totals) + { + /** Если totals_mode == AFTER_HAVING_AUTO, нужно решить, добавлять ли в TOTALS агрегаты для строк, + * не прошедших max_rows_to_group_by. + */ + if (overflow_aggregates && static_cast(passed_keys) / total_keys >= auto_include_threshold) + addToTotals(current_totals, overflow_aggregates, nullptr); + + finalize(current_totals); + totals = current_totals; + } + + if (totals && expression) + expression->execute(totals); + + return totals; +} + + Block TotalsHavingBlockInputStream::readImpl() { Block finalized; @@ -29,16 +51,7 @@ Block TotalsHavingBlockInputStream::readImpl() block = children[0]->read(); if (!block) - { - /** Если totals_mode==AFTER_HAVING_AUTO, нужно решить, добавлять ли в TOTALS агрегаты для строк, - * не прошедших max_rows_to_group_by. - */ - if (overflow_aggregates && static_cast(passed_keys) / total_keys >= auto_include_threshold) - addToTotals(current_totals, overflow_aggregates, nullptr); - finalize(current_totals); - totals = current_totals; return finalized; - } finalized = block; finalize(finalized); @@ -47,8 +60,8 @@ Block TotalsHavingBlockInputStream::readImpl() if (filter_column_name.empty() || totals_mode == TotalsMode::BEFORE_HAVING) { - /** Включая особую нулевую строку, если overflow_row=true. - * Предполагается, что если totals_mode=AFTER_HAVING_EXCLUSIVE, нам эту строку не дадут. + /** Включая особую нулевую строку, если overflow_row == true. + * Предполагается, что если totals_mode == AFTER_HAVING_EXCLUSIVE, нам эту строку не дадут. */ addToTotals(current_totals, block, nullptr); } @@ -126,8 +139,7 @@ Block TotalsHavingBlockInputStream::readImpl() } } -void TotalsHavingBlockInputStream::addToTotals(Block & totals, Block & block, const IColumn::Filter * filter, - size_t rows) +void TotalsHavingBlockInputStream::addToTotals(Block & totals, Block & block, const IColumn::Filter * filter, size_t rows) { bool init = !totals; @@ -137,9 +149,8 @@ void TotalsHavingBlockInputStream::addToTotals(Block & totals, Block & block, co for (size_t i = 0; i < block.columns(); ++i) { - ColumnWithNameAndType & current = block.getByPosition(i); - ColumnAggregateFunction * column = - dynamic_cast(&*current.column); + const ColumnWithNameAndType & current = block.getByPosition(i); + const ColumnAggregateFunction * column = dynamic_cast(&*current.column); if (!column) { @@ -176,23 +187,19 @@ void TotalsHavingBlockInputStream::addToTotals(Block & totals, Block & block, co data = target->getData()[0]; } - ColumnAggregateFunction::Container_t & vec = column->getData(); + const ColumnAggregateFunction::Container_t & vec = column->getData(); size_t size = std::min(vec.size(), rows); if (filter) { for (size_t j = 0; j < size; ++j) - { if ((*filter)[j]) function->merge(data, vec[j]); - } } else { for (size_t j = 0; j < size; ++j) - { function->merge(data, vec[j]); - } } } } From b07f478984b432377ab59910fb08b51df2e3fd21 Mon Sep 17 00:00:00 2001 From: Alexey Milovidov Date: Mon, 21 Apr 2014 20:20:57 +0400 Subject: [PATCH 02/14] dbms: added tests [#METR-10705]. --- .../0_stateless/00037_totals_limit.reference | 31 +++++++++++++++++++ .../0_stateless/00037_totals_limit.sql | 1 + .../0_stateless/00038_totals_limit.reference | 3 ++ .../0_stateless/00038_totals_limit.sql | 1 + 4 files changed, 36 insertions(+) create mode 100644 dbms/tests/queries/0_stateless/00037_totals_limit.reference create mode 100644 dbms/tests/queries/0_stateless/00037_totals_limit.sql create mode 100644 dbms/tests/queries/0_stateless/00038_totals_limit.reference create mode 100644 dbms/tests/queries/0_stateless/00038_totals_limit.sql diff --git a/dbms/tests/queries/0_stateless/00037_totals_limit.reference b/dbms/tests/queries/0_stateless/00037_totals_limit.reference new file mode 100644 index 00000000000..d9ed69f7ff0 --- /dev/null +++ b/dbms/tests/queries/0_stateless/00037_totals_limit.reference @@ -0,0 +1,31 @@ +{ + "meta": + [ + { + "name": "count()", + "type": "UInt64" + }, + { + "name": "n", + "type": "UInt8" + } + ], + + "data": + [ + { + "count()": "1", + "n": 1 + } + ], + + "totals": + { + "count()": "3", + "n": 0 + }, + + "rows": 1, + + "rows_before_limit_at_least": 3 +} diff --git a/dbms/tests/queries/0_stateless/00037_totals_limit.sql b/dbms/tests/queries/0_stateless/00037_totals_limit.sql new file mode 100644 index 00000000000..62d09415fe9 --- /dev/null +++ b/dbms/tests/queries/0_stateless/00037_totals_limit.sql @@ -0,0 +1 @@ +SELECT count(), arrayJoin([1, 2, 3]) AS n GROUP BY n WITH TOTALS LIMIT 1 FORMAT JSON diff --git a/dbms/tests/queries/0_stateless/00038_totals_limit.reference b/dbms/tests/queries/0_stateless/00038_totals_limit.reference new file mode 100644 index 00000000000..a594e1495c1 --- /dev/null +++ b/dbms/tests/queries/0_stateless/00038_totals_limit.reference @@ -0,0 +1,3 @@ +1 + +1 diff --git a/dbms/tests/queries/0_stateless/00038_totals_limit.sql b/dbms/tests/queries/0_stateless/00038_totals_limit.sql new file mode 100644 index 00000000000..09960c92eb4 --- /dev/null +++ b/dbms/tests/queries/0_stateless/00038_totals_limit.sql @@ -0,0 +1 @@ +SELECT count() GROUP BY 1 WITH TOTALS LIMIT 1 From 345b1bfb44d14f304c64f16c21d0788f5e20f93f Mon Sep 17 00:00:00 2001 From: Michael Kolupaev Date: Tue, 22 Apr 2014 14:41:38 +0400 Subject: [PATCH 03/14] Merge --- dbms/src/Interpreters/ExpressionAnalyzer.cpp | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/dbms/src/Interpreters/ExpressionAnalyzer.cpp b/dbms/src/Interpreters/ExpressionAnalyzer.cpp index 53e06f9a741..60e35d621a5 100644 --- a/dbms/src/Interpreters/ExpressionAnalyzer.cpp +++ b/dbms/src/Interpreters/ExpressionAnalyzer.cpp @@ -211,6 +211,8 @@ bool ExpressionAnalyzer::needSignRewrite() return merge_tree->getName() == "CollapsingMergeTree"; if (const StorageDistributed * distributed = dynamic_cast(&*storage)) return !distributed->getSignColumnName().empty(); + if (const StorageReplicatedMergeTree * replicated = dynamic_cast(&*storage)) + return replicated->getName() == "CollapsingReplicatedMergeTree"; } return false; } @@ -222,6 +224,8 @@ String ExpressionAnalyzer::getSignColumnName() return merge_tree->getSignColumnName(); if (const StorageDistributed * distributed = dynamic_cast(&*storage)) return distributed->getSignColumnName(); + if (const StorageReplicatedMergeTree * replicated = dynamic_cast(&*storage)) + return replicated->getSignColumnName(); return ""; } From c2b556f51b21655a7d231440c071db300629073d Mon Sep 17 00:00:00 2001 From: Michael Kolupaev Date: Tue, 22 Apr 2014 14:43:18 +0400 Subject: [PATCH 04/14] Merge --- libs/libzkutil/include/zkutil/LeaderElection.h | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/libs/libzkutil/include/zkutil/LeaderElection.h b/libs/libzkutil/include/zkutil/LeaderElection.h index ae51ff542c0..61740f669d7 100644 --- a/libs/libzkutil/include/zkutil/LeaderElection.h +++ b/libs/libzkutil/include/zkutil/LeaderElection.h @@ -103,7 +103,7 @@ private: } WatchFuture future; - if (zookeeper.exists(*(it - 1), nullptr, &future)) + if (zookeeper.exists(path + "/" + *(it - 1), nullptr, &future)) { while (!shutdown) if (future.wait_for(std::chrono::seconds(2)) != std::future_status::timeout) From 052b2bd83b1ca134be5d33fbb72fe1a41fa8fde7 Mon Sep 17 00:00:00 2001 From: Michael Kolupaev Date: Tue, 22 Apr 2014 14:46:38 +0400 Subject: [PATCH 05/14] zkcpp: added test showing that zkcpp blocks calls forever after session expiration. [#METR-10202] --- libs/libzkutil/include/zkutil/Types.h | 2 +- .../src/tests/zkcpp_expiration_test.cpp | 43 +++++++++++++++++++ libs/libzkutil/src/tests/zkutil_test.cpp | 6 --- 3 files changed, 44 insertions(+), 7 deletions(-) create mode 100644 libs/libzkutil/src/tests/zkcpp_expiration_test.cpp diff --git a/libs/libzkutil/include/zkutil/Types.h b/libs/libzkutil/include/zkutil/Types.h index 99fb1e988bb..84a92d8eb1c 100644 --- a/libs/libzkutil/include/zkutil/Types.h +++ b/libs/libzkutil/include/zkutil/Types.h @@ -26,7 +26,7 @@ typedef std::shared_ptr > OpResultsPtr; typedef std::vector Strings; typedef boost::function WatchFunction; + const std::string & path)> WatchFunction; struct WatchEventInfo { diff --git a/libs/libzkutil/src/tests/zkcpp_expiration_test.cpp b/libs/libzkutil/src/tests/zkcpp_expiration_test.cpp new file mode 100644 index 00000000000..150b4fac3ca --- /dev/null +++ b/libs/libzkutil/src/tests/zkcpp_expiration_test.cpp @@ -0,0 +1,43 @@ +#include + +namespace zk = org::apache::zookeeper; + +void stateChanged(zk::WatchEvent::type event, zk::SessionState::type state, const std::string & path) +{ + std::cout << "state changed; event: " << zk::WatchEvent::toString(event) << ", state: " << zk::SessionState::toString(state) + << ", path: " << path << std::endl; +} + +int main() +{ + zk::ZooKeeper zookeeper; + zookeeper.init("example1:2181,example2:2181,example3:2181", 5000, nullptr); + + std::vector children; + zk::data::Stat stat; + zk::ReturnCode::type ret = zookeeper.getChildren("/", nullptr, children, stat); + + std::cout << "getChildren returned " << zk::ReturnCode::toString(ret) << std::endl; + std::cout << "children of /:" << std::endl; + for (const auto & s : children) + { + std::cout << s << std::endl; + } + + std::cout << "break connection to example1:2181,example2:2181,example3:2181 for at least 5 seconds and enter something" << std::endl; + std::string unused; + std::cin >> unused; + + children.clear(); + std::cout << "will getChildren" << std::endl; + ret = zookeeper.getChildren("/", nullptr, children, stat); + + std::cout << "getChildren returned " << zk::ReturnCode::toString(ret) << std::endl; + std::cout << "children of /:" << std::endl; + for (const auto & s : children) + { + std::cout << s << std::endl; + } + + return 0; +} diff --git a/libs/libzkutil/src/tests/zkutil_test.cpp b/libs/libzkutil/src/tests/zkutil_test.cpp index 41d06fdcfb5..29ee06a2a0f 100644 --- a/libs/libzkutil/src/tests/zkutil_test.cpp +++ b/libs/libzkutil/src/tests/zkutil_test.cpp @@ -48,12 +48,6 @@ int main(int argc, char ** argv) while (char * line = readline(":3 ")) { - if (zk.disconnected()) - { - std::cerr << "Disconnected" << std::endl; - break; - } - try { std::stringstream ss(line); From ad3954a28af0eef3a718eba7d12ebbfa984883cf Mon Sep 17 00:00:00 2001 From: Michael Kolupaev Date: Tue, 22 Apr 2014 14:55:36 +0400 Subject: [PATCH 06/14] zkutil: explained a test. [#METR-10202] --- libs/libzkutil/src/tests/nozk.sh | 15 +++++++++++++++ libs/libzkutil/src/tests/yeszk.sh | 6 ++++++ .../libzkutil/src/tests/zkcpp_expiration_test.cpp | 6 +++++- 3 files changed, 26 insertions(+), 1 deletion(-) create mode 100755 libs/libzkutil/src/tests/nozk.sh create mode 100755 libs/libzkutil/src/tests/yeszk.sh diff --git a/libs/libzkutil/src/tests/nozk.sh b/libs/libzkutil/src/tests/nozk.sh new file mode 100755 index 00000000000..9a754967e13 --- /dev/null +++ b/libs/libzkutil/src/tests/nozk.sh @@ -0,0 +1,15 @@ +#!/bin/bash + +# Добавляет в файрвол правила, не пропускающие пакеты до серверов ZooKeeper. +# Используется для тестирования поведения программ при потере соединения с ZooKeeper. +# yeszk.sh производит обратные изменения. + +# Чтобы посмотреть, какие правила сейчас есть, используйте sudo iptables -L и sudo ip6tables -L + +sudo iptables -A OUTPUT -d example1 -j DROP +sudo iptables -A OUTPUT -d example2 -j DROP +sudo iptables -A OUTPUT -d example3 -j DROP +sudo ip6tables -A OUTPUT -d example1 -j DROP +sudo ip6tables -A OUTPUT -d example2 -j DROP +sudo ip6tables -A OUTPUT -d example3 -j DROP + diff --git a/libs/libzkutil/src/tests/yeszk.sh b/libs/libzkutil/src/tests/yeszk.sh new file mode 100755 index 00000000000..925938c8757 --- /dev/null +++ b/libs/libzkutil/src/tests/yeszk.sh @@ -0,0 +1,6 @@ +#!/bin/bash + +# Выполняет действия, обратные nozk.sh + +cat nozk.sh | sed 's/-A/-D/g' | bash + diff --git a/libs/libzkutil/src/tests/zkcpp_expiration_test.cpp b/libs/libzkutil/src/tests/zkcpp_expiration_test.cpp index 150b4fac3ca..1bdc6e4004b 100644 --- a/libs/libzkutil/src/tests/zkcpp_expiration_test.cpp +++ b/libs/libzkutil/src/tests/zkcpp_expiration_test.cpp @@ -2,6 +2,10 @@ namespace zk = org::apache::zookeeper; +/** Проверяет, правда ли, что вызовы в zkcpp при просроченной сессии блокируются навсегда. + * Разорвать сессию можно, например, так: `./nozk.sh && sleep 6s && ./yeszk.sh` + */ + void stateChanged(zk::WatchEvent::type event, zk::SessionState::type state, const std::string & path) { std::cout << "state changed; event: " << zk::WatchEvent::toString(event) << ", state: " << zk::SessionState::toString(state) @@ -29,7 +33,7 @@ int main() std::cin >> unused; children.clear(); - std::cout << "will getChildren" << std::endl; + std::cout << "will getChildren (this call will block forever, which seems to be zkcpp issue)" << std::endl; ret = zookeeper.getChildren("/", nullptr, children, stat); std::cout << "getChildren returned " << zk::ReturnCode::toString(ret) << std::endl; From dba95b4de593752baef4dd08d53ec1728b43e7c2 Mon Sep 17 00:00:00 2001 From: Michael Kolupaev Date: Tue, 22 Apr 2014 14:57:19 +0400 Subject: [PATCH 07/14] Merge --- .../DB/Storages/StorageReplicatedMergeTree.h | 1 + .../Storages/StorageReplicatedMergeTree.cpp | 31 ++++++++++++------- 2 files changed, 21 insertions(+), 11 deletions(-) diff --git a/dbms/include/DB/Storages/StorageReplicatedMergeTree.h b/dbms/include/DB/Storages/StorageReplicatedMergeTree.h index 473ea3fefb8..d018f20b04d 100644 --- a/dbms/include/DB/Storages/StorageReplicatedMergeTree.h +++ b/dbms/include/DB/Storages/StorageReplicatedMergeTree.h @@ -34,6 +34,7 @@ public: const String & sign_column_ = "", const MergeTreeSettings & settings_ = MergeTreeSettings()); + void startup(); void shutdown(); ~StorageReplicatedMergeTree(); diff --git a/dbms/src/Storages/StorageReplicatedMergeTree.cpp b/dbms/src/Storages/StorageReplicatedMergeTree.cpp index eb6d1b6d462..158e9e46af2 100644 --- a/dbms/src/Storages/StorageReplicatedMergeTree.cpp +++ b/dbms/src/Storages/StorageReplicatedMergeTree.cpp @@ -60,14 +60,6 @@ StorageReplicatedMergeTree::StorageReplicatedMergeTree( } loadQueue(); - activateReplica(); - - leader_election = new zkutil::LeaderElection(zookeeper_path + "/leader_election", zookeeper, - std::bind(&StorageReplicatedMergeTree::becomeLeader, this), replica_name); - - queue_updating_thread = std::thread(&StorageReplicatedMergeTree::queueUpdatingThread, this); - for (size_t i = 0; i < settings_.replication_threads; ++i) - queue_threads.push_back(std::thread(&StorageReplicatedMergeTree::queueThread, this)); } StoragePtr StorageReplicatedMergeTree::create( @@ -88,9 +80,7 @@ StoragePtr StorageReplicatedMergeTree::create( path_, name_, columns_, context_, primary_expr_ast_, date_column_name_, sampling_expression_, index_granularity_, mode_, sign_column_, settings_); StoragePtr res_ptr = res->thisPtr(); - String endpoint_name = "ReplicatedMergeTree:" + res->replica_path; - InterserverIOEndpointPtr endpoint = new ReplicatedMergeTreePartsServer(res->data, res_ptr); - res->endpoint_holder = new InterserverIOEndpointHolder(endpoint_name, endpoint, res->context.getInterserverIOHandler()); + res->startup(); return res_ptr; } @@ -1022,6 +1012,7 @@ void StorageReplicatedMergeTree::shutdown() LOG_TRACE(log, "Waiting for threads to finish"); if (is_leader_node) { + is_leader_node = false; merge_selecting_thread.join(); clear_old_blocks_thread.join(); } @@ -1031,6 +1022,24 @@ void StorageReplicatedMergeTree::shutdown() LOG_TRACE(log, "Threads finished"); } +void StorageReplicatedMergeTree::startup() +{ + shutdown_called = false; + + String endpoint_name = "ReplicatedMergeTree:" + replica_path; + InterserverIOEndpointPtr endpoint = new ReplicatedMergeTreePartsServer(data, thisPtr()); + endpoint_holder = new InterserverIOEndpointHolder(endpoint_name, endpoint, context.getInterserverIOHandler()); + + activateReplica(); + + leader_election = new zkutil::LeaderElection(zookeeper_path + "/leader_election", zookeeper, + std::bind(&StorageReplicatedMergeTree::becomeLeader, this), replica_name); + + queue_updating_thread = std::thread(&StorageReplicatedMergeTree::queueUpdatingThread, this); + for (size_t i = 0; i < data.settings.replication_threads; ++i) + queue_threads.push_back(std::thread(&StorageReplicatedMergeTree::queueThread, this)); +} + StorageReplicatedMergeTree::~StorageReplicatedMergeTree() { try From db9263f5f3bb0533820069b53f91204ecba4b131 Mon Sep 17 00:00:00 2001 From: Michael Kolupaev Date: Tue, 22 Apr 2014 15:19:56 +0400 Subject: [PATCH 08/14] zkutil: not blocking calls if session is expired. [#METR-10202] --- libs/libzkutil/include/zkutil/ZooKeeper.h | 6 ++++++ libs/libzkutil/src/ZooKeeper.cpp | 25 +++++++++++++++++++---- 2 files changed, 27 insertions(+), 4 deletions(-) diff --git a/libs/libzkutil/include/zkutil/ZooKeeper.h b/libs/libzkutil/include/zkutil/ZooKeeper.h index 042d6b1831d..55abf4401e0 100644 --- a/libs/libzkutil/include/zkutil/ZooKeeper.h +++ b/libs/libzkutil/include/zkutil/ZooKeeper.h @@ -130,6 +130,12 @@ private: SessionState::type session_state; void stateChanged(WatchEvent::type event, SessionState::type state, const std::string& path); + + /** Бросает исключение, если сессия истекла. Почему-то zkcpp этого не делает, а вместо этого виснет (смотри zkcpp_expiration_test). + * Не очень надежно: возможно, вызов к zkcpp все же может повиснуть, если между проверкой и вызовом состояние успеет поменяться. + * Если это окажется проблемой, возможно, стоит избавиться от zkcpp. + */ + void checkNotExpired(); }; diff --git a/libs/libzkutil/src/ZooKeeper.cpp b/libs/libzkutil/src/ZooKeeper.cpp index c1b5ad91b8a..f332265dc73 100644 --- a/libs/libzkutil/src/ZooKeeper.cpp +++ b/libs/libzkutil/src/ZooKeeper.cpp @@ -118,6 +118,12 @@ void ZooKeeper::stateChanged(WatchEvent::type event, SessionState::type state, c (*state_watch)(event, state, path); } +void ZooKeeper::checkNotExpired() +{ + if (disconnected()) + throw KeeperException(ReturnCode::SessionExpired); +} + bool ZooKeeper::disconnected() { Poco::ScopedLock lock(mutex); @@ -142,6 +148,7 @@ ACLs ZooKeeper::getDefaultACL() Strings ZooKeeper::getChildren( const std::string & path, Stat * stat, WatchFuture * watch) { + checkNotExpired(); Stat s; Strings res; CHECKED(impl.getChildren(path, watchForFuture(watch), res, s), path); @@ -153,6 +160,7 @@ Strings ZooKeeper::getChildren( bool ZooKeeper::tryGetChildren(const std::string & path, Strings & res, Stat * stat, WatchFuture * watch) { + checkNotExpired(); Stat s; ReturnCode::type code = impl.getChildren(path, watchForFuture(watch), res, s); if (!( code == ReturnCode::Ok || @@ -167,18 +175,18 @@ bool ZooKeeper::tryGetChildren(const std::string & path, Strings & res, std::string ZooKeeper::create(const std::string & path, const std::string & data, CreateMode::type mode) { - Poco::ScopedLock lock(mutex); + checkNotExpired(); std::string res; - CHECKED(impl.create(path, data, default_acl, mode, res), path); + CHECKED(impl.create(path, data, getDefaultACL(), mode, res), path); return res; } ReturnCode::type ZooKeeper::tryCreate(const std::string & path, const std::string & data, CreateMode::type mode, std::string & pathCreated) { - Poco::ScopedLock lock(mutex); + checkNotExpired(); - ReturnCode::type code = impl.create(path, data, default_acl, mode, pathCreated); + ReturnCode::type code = impl.create(path, data, getDefaultACL(), mode, pathCreated); if (!( code == ReturnCode::Ok || code == ReturnCode::NoNode || code == ReturnCode::NodeExists || @@ -189,11 +197,13 @@ ReturnCode::type ZooKeeper::tryCreate(const std::string & path, const std::strin void ZooKeeper::remove(const std::string & path, int32_t version) { + checkNotExpired(); CHECKED(impl.remove(path, version), path); } ReturnCode::type ZooKeeper::tryRemove(const std::string & path, int32_t version) { + checkNotExpired(); ReturnCode::type code = impl.remove(path, version); if (!( code == ReturnCode::Ok || code == ReturnCode::NoNode || @@ -205,6 +215,7 @@ ReturnCode::type ZooKeeper::tryRemove(const std::string & path, int32_t version) bool ZooKeeper::exists(const std::string & path, Stat * stat, WatchFuture * watch) { + checkNotExpired(); Stat s; ReturnCode::type code = impl.exists(path, watchForFuture(watch), s); if (!( code == ReturnCode::Ok || @@ -219,6 +230,7 @@ bool ZooKeeper::exists(const std::string & path, Stat * stat, WatchFuture * watc std::string ZooKeeper::get(const std::string & path, Stat * stat, WatchFuture * watch) { + checkNotExpired(); std::string res; Stat s; CHECKED(impl.get(path, watchForFuture(watch), res, s), path); @@ -229,6 +241,7 @@ std::string ZooKeeper::get(const std::string & path, Stat * stat, WatchFuture * bool ZooKeeper::tryGet(const std::string & path, std::string & res, Stat * stat, WatchFuture * watch) { + checkNotExpired(); Stat s; ReturnCode::type code = impl.get(path, watchForFuture(watch), res, s); if (!( code == ReturnCode::Ok || @@ -243,6 +256,7 @@ bool ZooKeeper::tryGet(const std::string & path, std::string & res, Stat * stat, void ZooKeeper::set(const std::string & path, const std::string & data, int32_t version, Stat * stat) { + checkNotExpired(); Stat s; CHECKED(impl.set(path, data, version, s), path); if (stat) @@ -252,6 +266,7 @@ void ZooKeeper::set(const std::string & path, const std::string & data, int32_t ReturnCode::type ZooKeeper::trySet(const std::string & path, const std::string & data, int32_t version, Stat * stat) { + checkNotExpired(); Stat s; ReturnCode::type code = impl.set(path, data, version, s); if (!( code == ReturnCode::Ok || @@ -265,6 +280,7 @@ ReturnCode::type ZooKeeper::trySet(const std::string & path, const std::string & OpResultsPtr ZooKeeper::multi(const Ops & ops) { + checkNotExpired(); OpResultsPtr res = std::make_shared(); CHECKED_WITHOUT_PATH(impl.multi(ops, *res)); for (size_t i = 0; i < res->size(); ++i) @@ -277,6 +293,7 @@ OpResultsPtr ZooKeeper::multi(const Ops & ops) ReturnCode::type ZooKeeper::tryMulti(const Ops & ops, OpResultsPtr * out_results) { + checkNotExpired(); OpResultsPtr results = std::make_shared(); ReturnCode::type code = impl.multi(ops, *results); if (out_results) From c139a6d209b12cc18b4022f8e72bb073b9c768be Mon Sep 17 00:00:00 2001 From: Sergey Fedorov Date: Tue, 22 Apr 2014 19:34:59 +0400 Subject: [PATCH 09/14] dbms: fixed bug in merge tree output stream: compression of min block now runs at the right moment [METR-10570] --- dbms/include/DB/Core/Defines.h | 2 +- .../MergeTree/MergedBlockOutputStream.h | 26 ++++++++++++------- 2 files changed, 17 insertions(+), 11 deletions(-) diff --git a/dbms/include/DB/Core/Defines.h b/dbms/include/DB/Core/Defines.h index 89b2461bf44..79f69403e72 100644 --- a/dbms/include/DB/Core/Defines.h +++ b/dbms/include/DB/Core/Defines.h @@ -23,7 +23,7 @@ /// Какими блоками по-умолчанию читаются и пишутся данные (в числе строк). #define DEFAULT_BLOCK_SIZE 1048576 /// То же самое, но для операций слияния. Меньше DEFAULT_BLOCK_SIZE для экономии оперативки (так как читаются все столбцы). -#define DEFAULT_MERGE_BLOCK_SIZE 10000 +#define DEFAULT_MERGE_BLOCK_SIZE 8192 #define DEFAULT_MAX_QUERY_SIZE 65536 #define SHOW_CHARS_ON_SYNTAX_ERROR 160L diff --git a/dbms/include/DB/Storages/MergeTree/MergedBlockOutputStream.h b/dbms/include/DB/Storages/MergeTree/MergedBlockOutputStream.h index ac30e09c4cd..9399a7f0091 100644 --- a/dbms/include/DB/Storages/MergeTree/MergedBlockOutputStream.h +++ b/dbms/include/DB/Storages/MergeTree/MergedBlockOutputStream.h @@ -135,16 +135,19 @@ protected: else { limit = storage.index_granularity; + + /// Уже могло накопиться достаточно данных для сжатия в новый блок. + if (stream.compressed.offset() >= min_compress_block_size) + stream.compressed.next(); + writeIntBinary(stream.plain_hashing.count(), stream.marks); writeIntBinary(stream.compressed.offset(), stream.marks); } type_arr->serializeOffsets(column, stream.compressed, prev_mark, limit); - /// Уже могло накопиться достаточно данных для сжатия в новый блок. - if (stream.compressed.offset() >= min_compress_block_size) - stream.compressed.next(); - else - stream.compressed.nextIfAtEnd(); /// Чтобы вместо засечек, указывающих на конец сжатого блока, были засечки, указывающие на начало следующего. + + stream.compressed.nextIfAtEnd(); /// Чтобы вместо засечек, указывающих на конец сжатого блока, были засечки, указывающие на начало следующего. + prev_mark += limit; } } @@ -166,16 +169,19 @@ protected: else { limit = storage.index_granularity; + + /// Уже могло накопиться достаточно данных для сжатия в новый блок. + if (stream.compressed.offset() >= min_compress_block_size) + stream.compressed.next(); + writeIntBinary(stream.plain_hashing.count(), stream.marks); writeIntBinary(stream.compressed.offset(), stream.marks); } type.serializeBinary(column, stream.compressed, prev_mark, limit); - /// Уже могло накопиться достаточно данных для сжатия в новый блок. - if (stream.compressed.offset() >= min_compress_block_size) - stream.compressed.next(); - else - stream.compressed.nextIfAtEnd(); /// Чтобы вместо засечек, указывающих на конец сжатого блока, были засечки, указывающие на начало следующего. + + stream.compressed.nextIfAtEnd(); /// Чтобы вместо засечек, указывающих на конец сжатого блока, были засечки, указывающие на начало следующего. + prev_mark += limit; } } From 4befbd1f20178d07fc877f7875be1e83e682a3da Mon Sep 17 00:00:00 2001 From: Alexey Milovidov Date: Tue, 22 Apr 2014 21:55:30 +0400 Subject: [PATCH 10/14] dbms: Client: fixed error [#METR-10719]. --- dbms/src/Client/Client.cpp | 26 ++++++++++++++------------ 1 file changed, 14 insertions(+), 12 deletions(-) diff --git a/dbms/src/Client/Client.cpp b/dbms/src/Client/Client.cpp index 2c9e38b02ff..91f52adf3b4 100644 --- a/dbms/src/Client/Client.cpp +++ b/dbms/src/Client/Client.cpp @@ -899,10 +899,10 @@ public: main_description.add_options() ("help", "produce help message") ("config-file,c", boost::program_options::value (), "config-file path") - ("host,h", boost::program_options::value ()->default_value("localhost"), "server host") - ("port,p", boost::program_options::value ()->default_value(9000), "server port") + ("host,h", boost::program_options::value ()->implicit_value("")->default_value("localhost"), "server host") + ("port", boost::program_options::value ()->default_value(9000), "server port") ("user,u", boost::program_options::value (), "user") - ("password,p", boost::program_options::value (), "password") + ("password", boost::program_options::value (), "password") ("query,q", boost::program_options::value (), "query") ("database,d", boost::program_options::value (), "database") ("multiline,m", "multiline") @@ -929,7 +929,9 @@ public: boost::program_options::store(parsed, options); /// Демонстрация help message - if (options.count("help")) { + if (options.count("help") + || (options.count("host") && (options["host"].as().empty() || options["host"].as() == "elp"))) + { std::cout << main_description << "\n"; std::cout << external_description << "\n"; exit(0); @@ -939,12 +941,11 @@ public: /// Опции командной строки, составленные только из аргументов, не перечисленных в main_description. char newargc = to_pass_further.size() + 1; - char *new_argv[newargc]; + const char * new_argv[newargc]; + + new_argv[0] = ""; for (size_t i = 0; i < to_pass_further.size(); ++i) - { - new_argv[i+1] = new char[to_pass_further[i].size() + 1]; - std::strcpy(new_argv[i+1], to_pass_further[i].c_str()); - } + new_argv[i + 1] = to_pass_further[i].c_str(); /// Разбиваем на интервалы внешних таблиц. std::vector positions; @@ -957,11 +958,12 @@ public: size_t cnt = positions.size(); size_t stdin_count = 0; - for (size_t i = 1; i < cnt-1; ++i) + for (size_t i = 1; i + 1 < cnt; ++i) { boost::program_options::variables_map external_options; boost::program_options::store(boost::program_options::parse_command_line( - positions[i+1] - positions[i], &new_argv[positions[i]], external_description), external_options); + positions[i + 1] - positions[i], &new_argv[positions[i]], external_description), external_options); + try { external_tables.push_back(ExternalTable(external_options)); @@ -974,7 +976,7 @@ public: { std::string text = e.displayText(); std::cerr << "Code: " << e.code() << ". " << text << std::endl; - std::cerr << "Table #" << i << std::endl << std::endl; + std::cerr << "Table №" << i << std::endl << std::endl; exit(e.code()); } } From 6bec60bb39aad25e7d2a4e9c729ce1a3fd76270e Mon Sep 17 00:00:00 2001 From: Alexey Milovidov Date: Tue, 22 Apr 2014 22:35:40 +0400 Subject: [PATCH 11/14] dbms: external tables: fixed error [#METR-10071]. --- dbms/include/DB/Common/ExternalTable.h | 55 +++++++------------------- dbms/src/Client/Client.cpp | 33 ++++++++-------- 2 files changed, 31 insertions(+), 57 deletions(-) diff --git a/dbms/include/DB/Common/ExternalTable.h b/dbms/include/DB/Common/ExternalTable.h index 8095b117df7..55ef51b2189 100644 --- a/dbms/include/DB/Common/ExternalTable.h +++ b/dbms/include/DB/Common/ExternalTable.h @@ -1,6 +1,7 @@ #pragma once #include +#include #include #include #include @@ -27,7 +28,7 @@ public: /// Описание структуры таблицы: (имя столбца, имя типа данных) std::vector > structure; - ReadBuffer *read_buffer; + std::unique_ptr read_buffer; Block sample_block; virtual ~BaseExternalTable() {}; @@ -36,7 +37,7 @@ public: virtual void initReadBuffer() {}; /// Инициализировать sample_block по структуре таблицы сохраненной в structure - virtual void initSampleBlock(const Context &context) + virtual void initSampleBlock(const Context & context) { for (size_t i = 0; i < structure.size(); ++i) { @@ -49,7 +50,7 @@ public: } /// Получить данные таблицы - пару (поток с содержимым таблицы, имя таблицы) - virtual ExternalTableData getData(const Context &context) + virtual ExternalTableData getData(const Context & context) { initReadBuffer(); initSampleBlock(context); @@ -67,7 +68,7 @@ protected: format = ""; structure.clear(); sample_block = Block(); - read_buffer = NULL; + read_buffer.reset(); } /// Функция для отладочного вывода информации @@ -81,23 +82,10 @@ protected: std::cerr << "\t" << structure[i].first << " " << structure[i].second << std::endl; } - static std::vector split(const std::string & s, const std::string &d) + static std::vector split(const std::string & s, const std::string & d) { std::vector res; - std::string now; - for (size_t i = 0; i < s.size(); ++i) - { - if (d.find(s[i]) != std::string::npos) - { - if (!now.empty()) - res.push_back(now); - now = ""; - continue; - } - now += s[i]; - } - if (!now.empty()) - res.push_back(now); + boost::split(res, s, boost::algorithm::is_any_of(d), boost::algorithm::token_compress_on); return res; } @@ -110,7 +98,7 @@ protected: throw Exception("Odd number of attributes in section structure", ErrorCodes::BAD_ARGUMENTS); for (size_t i = 0; i < vals.size(); i += 2) - structure.push_back(std::make_pair(vals[i], vals[i+1])); + structure.emplace_back(vals[i], vals[i + 1]); } /// Построить вектор structure по текстовому полю types @@ -119,7 +107,7 @@ protected: std::vector vals = split(argument, " ,"); for (size_t i = 0; i < vals.size(); ++i) - structure.push_back(std::make_pair("_" + toString(i + 1), vals[i])); + structure.emplace_back("_" + toString(i + 1), vals[i]); } }; @@ -131,9 +119,9 @@ public: void initReadBuffer() { if (file == "-") - read_buffer = new ReadBufferFromIStream(std::cin); + read_buffer.reset(new ReadBufferFromFileDescriptor(STDIN_FILENO)); else - read_buffer = new ReadBufferFromFile(file); + read_buffer.reset(new ReadBufferFromFile(file)); } /// Извлечение параметров из variables_map, которая строится по командной строке клиента @@ -155,24 +143,9 @@ public: throw Exception("--format field have not been provided for external table", ErrorCodes::BAD_ARGUMENTS); if (external_options.count("structure")) - { - std::vector temp = external_options["structure"].as>(); - - std::string argument; - for (size_t i = 0; i < temp.size(); ++i) - argument = argument + temp[i] + " "; - - parseStructureFromStructureField(argument); - - } + parseStructureFromStructureField(external_options["structure"].as()); else if (external_options.count("types")) - { - std::vector temp = external_options["types"].as>(); - std::string argument; - for (size_t i = 0; i < temp.size(); ++i) - argument = argument + temp[i] + " "; - parseStructureFromTypesField(argument); - } + parseStructureFromTypesField(external_options["types"].as()); else throw Exception("Neither --structure nor --types have not been provided for external table", ErrorCodes::BAD_ARGUMENTS); } @@ -191,7 +164,7 @@ public: void handlePart(const Poco::Net::MessageHeader& header, std::istream& stream) { /// Буфер инициализируется здесь, а не в виртуальной функции initReadBuffer - read_buffer = new ReadBufferFromIStream(stream); + read_buffer.reset(new ReadBufferFromIStream(stream)); /// Извлекаем коллекцию параметров из MessageHeader Poco::Net::NameValueCollection content; diff --git a/dbms/src/Client/Client.cpp b/dbms/src/Client/Client.cpp index 91f52adf3b4..882f107ea87 100644 --- a/dbms/src/Client/Client.cpp +++ b/dbms/src/Client/Client.cpp @@ -116,7 +116,7 @@ private: bool written_first_block; /// Информация о внешних таблицах - std::vector external_tables; + std::list external_tables; void initialize(Poco::Util::Application & self) @@ -521,8 +521,9 @@ private: throw Exception("External tables could be sent only with select query", ErrorCodes::BAD_ARGUMENTS); std::vector data; - for (size_t i = 0; i < external_tables.size(); ++i) - data.push_back(external_tables[i].getData(context)); + for (auto & table : external_tables) + data.emplace_back(table.getData(context)); + connection->sendExternalTablesData(data); } @@ -898,13 +899,13 @@ public: boost::program_options::options_description main_description("Main options"); main_description.add_options() ("help", "produce help message") - ("config-file,c", boost::program_options::value (), "config-file path") - ("host,h", boost::program_options::value ()->implicit_value("")->default_value("localhost"), "server host") - ("port", boost::program_options::value ()->default_value(9000), "server port") - ("user,u", boost::program_options::value (), "user") - ("password", boost::program_options::value (), "password") - ("query,q", boost::program_options::value (), "query") - ("database,d", boost::program_options::value (), "database") + ("config-file,c", boost::program_options::value(), "config-file path") + ("host,h", boost::program_options::value()->implicit_value("")->default_value("localhost"), "server host") + ("port", boost::program_options::value()->default_value(9000), "server port") + ("user,u", boost::program_options::value(), "user") + ("password", boost::program_options::value(), "password") + ("query,q", boost::program_options::value(), "query") + ("database,d", boost::program_options::value(), "database") ("multiline,m", "multiline") ("multiquery,n", "multiquery") APPLY_FOR_SETTINGS(DECLARE_SETTING) @@ -916,11 +917,11 @@ public: /// Перечисляем опции командной строки относящиеся к внешним таблицам boost::program_options::options_description external_description("External tables options"); external_description.add_options() - ("file", boost::program_options::value (), "data file or - for stdin") - ("name", boost::program_options::value ()->default_value("_data"), "name of the table") - ("format", boost::program_options::value ()->default_value("TabSeparated"), "data format") - ("structure", boost::program_options::value> ()->multitoken(), "structure") - ("types", boost::program_options::value> ()->multitoken(), "types") + ("file", boost::program_options::value(), "data file or - for stdin") + ("name", boost::program_options::value()->default_value("_data"), "name of the table") + ("format", boost::program_options::value()->default_value("TabSeparated"), "data format") + ("structure", boost::program_options::value(), "structure") + ("types", boost::program_options::value(), "types") ; /// Парсим основные опции командной строки @@ -966,7 +967,7 @@ public: try { - external_tables.push_back(ExternalTable(external_options)); + external_tables.emplace_back(external_options); if (external_tables.back().file == "-") ++stdin_count; if (stdin_count > 1) From a3caebf31b0386f14a79f00ab5944da10153c2eb Mon Sep 17 00:00:00 2001 From: Alexey Milovidov Date: Wed, 23 Apr 2014 02:43:55 +0400 Subject: [PATCH 12/14] dbms: tiny improvement [#METR-10931]. --- .../DB/Storages/MergeTree/MergeTreeReader.h | 6 +++--- .../Storages/MergeTree/MergedBlockOutputStream.h | 10 +++++----- dbms/include/DB/Storages/StorageLog.h | 4 ++-- dbms/include/DB/Storages/StorageTinyLog.h | 4 ++-- dbms/src/Storages/StorageLog.cpp | 16 ++++++++-------- dbms/src/Storages/StorageTinyLog.cpp | 12 ++++++------ 6 files changed, 26 insertions(+), 26 deletions(-) diff --git a/dbms/include/DB/Storages/MergeTree/MergeTreeReader.h b/dbms/include/DB/Storages/MergeTree/MergeTreeReader.h index d85a3366639..d3582cd9e3d 100644 --- a/dbms/include/DB/Storages/MergeTree/MergeTreeReader.h +++ b/dbms/include/DB/Storages/MergeTree/MergeTreeReader.h @@ -263,7 +263,7 @@ private: } }; - typedef std::map > FileStreams; + typedef std::map > FileStreams; String path; FileStreams streams; @@ -293,13 +293,13 @@ private: + ARRAY_SIZES_COLUMN_NAME_SUFFIX + toString(level); if (!streams.count(size_name)) - streams.insert(std::make_pair(size_name, new Stream( + streams.emplace(size_name, std::unique_ptr(new Stream( path + escaped_size_name, uncompressed_cache, mark_cache))); addStream(name, *type_arr->getNestedType(), level + 1); } else - streams[name] = new Stream(path + escaped_column_name, uncompressed_cache, mark_cache); + streams[name].reset(new Stream(path + escaped_column_name, uncompressed_cache, mark_cache)); } void readData(const String & name, const IDataType & type, IColumn & column, size_t from_mark, size_t max_rows_to_read, diff --git a/dbms/include/DB/Storages/MergeTree/MergedBlockOutputStream.h b/dbms/include/DB/Storages/MergeTree/MergedBlockOutputStream.h index 9399a7f0091..a47b844f174 100644 --- a/dbms/include/DB/Storages/MergeTree/MergedBlockOutputStream.h +++ b/dbms/include/DB/Storages/MergeTree/MergedBlockOutputStream.h @@ -70,7 +70,7 @@ protected: } }; - typedef std::map > ColumnStreams; + typedef std::map > ColumnStreams; void addStream(const String & path, const String & name, const IDataType & type, size_t level = 0, String filename = "") { @@ -88,20 +88,20 @@ protected: String escaped_size_name = escapeForFileName(DataTypeNested::extractNestedTableName(name)) + ARRAY_SIZES_COLUMN_NAME_SUFFIX + toString(level); - column_streams[size_name] = new ColumnStream( + column_streams[size_name].reset(new ColumnStream( escaped_size_name, path + escaped_size_name + ".bin", path + escaped_size_name + ".mrk", - max_compress_block_size); + max_compress_block_size)); addStream(path, name, *type_arr->getNestedType(), level + 1); } else - column_streams[name] = new ColumnStream( + column_streams[name].reset(new ColumnStream( escaped_column_name, path + escaped_column_name + ".bin", path + escaped_column_name + ".mrk", - max_compress_block_size); + max_compress_block_size)); } diff --git a/dbms/include/DB/Storages/StorageLog.h b/dbms/include/DB/Storages/StorageLog.h index c0520ff4cc6..a33260c5a43 100644 --- a/dbms/include/DB/Storages/StorageLog.h +++ b/dbms/include/DB/Storages/StorageLog.h @@ -68,7 +68,7 @@ private: CompressedReadBuffer compressed; }; - typedef std::map > FileStreams; + typedef std::map > FileStreams; FileStreams streams; void addStream(const String & name, const IDataType & type, size_t level = 0); @@ -109,7 +109,7 @@ private: typedef std::vector > MarksForColumns; - typedef std::map > FileStreams; + typedef std::map > FileStreams; FileStreams streams; typedef std::set OffsetColumns; diff --git a/dbms/include/DB/Storages/StorageTinyLog.h b/dbms/include/DB/Storages/StorageTinyLog.h index 8c6ed56d37b..3a0c1a77177 100644 --- a/dbms/include/DB/Storages/StorageTinyLog.h +++ b/dbms/include/DB/Storages/StorageTinyLog.h @@ -48,7 +48,7 @@ private: CompressedReadBuffer compressed; }; - typedef std::map > FileStreams; + typedef std::map > FileStreams; FileStreams streams; void addStream(const String & name, const IDataType & type, size_t level = 0); @@ -83,7 +83,7 @@ private: } }; - typedef std::map > FileStreams; + typedef std::map > FileStreams; FileStreams streams; typedef std::set OffsetColumns; diff --git a/dbms/src/Storages/StorageLog.cpp b/dbms/src/Storages/StorageLog.cpp index b1dd7ee1479..29857e72716 100644 --- a/dbms/src/Storages/StorageLog.cpp +++ b/dbms/src/Storages/StorageLog.cpp @@ -166,7 +166,7 @@ void LogBlockInputStream::addStream(const String & name, const IDataType & type, { String size_name = DataTypeNested::extractNestedTableName(name) + ARRAY_SIZES_COLUMN_NAME_SUFFIX + toString(level); if (!streams.count(size_name)) - streams.insert(std::make_pair(size_name, new Stream( + streams.emplace(size_name, std::unique_ptr(new Stream( storage.files[size_name].data_file.path(), mark_number ? storage.files[size_name].marks[mark_number].offset @@ -177,22 +177,22 @@ void LogBlockInputStream::addStream(const String & name, const IDataType & type, else if (const DataTypeNested * type_nested = dynamic_cast(&type)) { String size_name = name + ARRAY_SIZES_COLUMN_NAME_SUFFIX + toString(level); - streams[size_name] = new Stream( + streams[size_name].reset(new Stream( storage.files[size_name].data_file.path(), mark_number ? storage.files[size_name].marks[mark_number].offset - : 0); + : 0)); const NamesAndTypesList & columns = *type_nested->getNestedTypesList(); for (NamesAndTypesList::const_iterator it = columns.begin(); it != columns.end(); ++it) addStream(DataTypeNested::concatenateNestedName(name, it->first), *it->second, level + 1); } else - streams[name] = new Stream( + streams[name].reset(new Stream( storage.files[name].data_file.path(), mark_number ? storage.files[name].marks[mark_number].offset - : 0); + : 0)); } @@ -292,7 +292,7 @@ void LogBlockOutputStream::addStream(const String & name, const IDataType & type { String size_name = DataTypeNested::extractNestedTableName(name) + ARRAY_SIZES_COLUMN_NAME_SUFFIX + toString(level); if (!streams.count(size_name)) - streams.insert(std::make_pair(size_name, new Stream( + streams.emplace(size_name, std::unique_ptr(new Stream( storage.files[size_name].data_file.path(), storage.max_compress_block_size))); addStream(name, *type_arr->getNestedType(), level + 1); @@ -300,14 +300,14 @@ void LogBlockOutputStream::addStream(const String & name, const IDataType & type else if (const DataTypeNested * type_nested = dynamic_cast(&type)) { String size_name = name + ARRAY_SIZES_COLUMN_NAME_SUFFIX + toString(level); - streams[size_name] = new Stream(storage.files[size_name].data_file.path(), storage.max_compress_block_size); + streams[size_name].reset(new Stream(storage.files[size_name].data_file.path(), storage.max_compress_block_size)); const NamesAndTypesList & columns = *type_nested->getNestedTypesList(); for (NamesAndTypesList::const_iterator it = columns.begin(); it != columns.end(); ++it) addStream(DataTypeNested::concatenateNestedName(name, it->first), *it->second, level + 1); } else - streams[name] = new Stream(storage.files[name].data_file.path(), storage.max_compress_block_size); + streams[name].reset(new Stream(storage.files[name].data_file.path(), storage.max_compress_block_size)); } diff --git a/dbms/src/Storages/StorageTinyLog.cpp b/dbms/src/Storages/StorageTinyLog.cpp index 73cf65987e0..ffdd00c2cb8 100644 --- a/dbms/src/Storages/StorageTinyLog.cpp +++ b/dbms/src/Storages/StorageTinyLog.cpp @@ -116,21 +116,21 @@ void TinyLogBlockInputStream::addStream(const String & name, const IDataType & t { String size_name = DataTypeNested::extractNestedTableName(name) + ARRAY_SIZES_COLUMN_NAME_SUFFIX + toString(level); if (!streams.count(size_name)) - streams.insert(std::make_pair(size_name, new Stream(storage.files[size_name].data_file.path()))); + streams.emplace(size_name, std::unique_ptr(new Stream(storage.files[size_name].data_file.path()))); addStream(name, *type_arr->getNestedType(), level + 1); } else if (const DataTypeNested * type_nested = dynamic_cast(&type)) { String size_name = name + ARRAY_SIZES_COLUMN_NAME_SUFFIX + toString(level); - streams[size_name] = new Stream(storage.files[size_name].data_file.path()); + streams[size_name].reset(new Stream(storage.files[size_name].data_file.path())); const NamesAndTypesList & columns = *type_nested->getNestedTypesList(); for (NamesAndTypesList::const_iterator it = columns.begin(); it != columns.end(); ++it) addStream(DataTypeNested::concatenateNestedName(name, it->first), *it->second, level + 1); } else - streams[name] = new Stream(storage.files[name].data_file.path()); + streams[name].reset(new Stream(storage.files[name].data_file.path())); } @@ -200,21 +200,21 @@ void TinyLogBlockOutputStream::addStream(const String & name, const IDataType & { String size_name = DataTypeNested::extractNestedTableName(name) + ARRAY_SIZES_COLUMN_NAME_SUFFIX + toString(level); if (!streams.count(size_name)) - streams.insert(std::make_pair(size_name, new Stream(storage.files[size_name].data_file.path(), storage.max_compress_block_size))); + streams.emplace(size_name, std::unique_ptr(new Stream(storage.files[size_name].data_file.path(), storage.max_compress_block_size))); addStream(name, *type_arr->getNestedType(), level + 1); } else if (const DataTypeNested * type_nested = dynamic_cast(&type)) { String size_name = name + ARRAY_SIZES_COLUMN_NAME_SUFFIX + toString(level); - streams[size_name] = new Stream(storage.files[size_name].data_file.path(), storage.max_compress_block_size); + streams[size_name].reset(new Stream(storage.files[size_name].data_file.path(), storage.max_compress_block_size)); const NamesAndTypesList & columns = *type_nested->getNestedTypesList(); for (NamesAndTypesList::const_iterator it = columns.begin(); it != columns.end(); ++it) addStream(DataTypeNested::concatenateNestedName(name, it->first), *it->second, level + 1); } else - streams[name] = new Stream(storage.files[name].data_file.path(), storage.max_compress_block_size); + streams[name].reset(new Stream(storage.files[name].data_file.path(), storage.max_compress_block_size)); } From b821e1420d650d01dce371af71a462b7dbca093c Mon Sep 17 00:00:00 2001 From: Alexey Milovidov Date: Wed, 23 Apr 2014 02:52:00 +0400 Subject: [PATCH 13/14] Fixed style [#METR-10931]. --- dbms/include/DB/Functions/FunctionsLogical.h | 3 ++- .../DB/Storages/MergeTree/MergeTreeBlockInputStream.h | 10 ++++++---- dbms/include/DB/Storages/MergeTree/PKCondition.h | 2 +- .../Functions/tests/logical_functions_performance.cpp | 3 ++- 4 files changed, 11 insertions(+), 7 deletions(-) diff --git a/dbms/include/DB/Functions/FunctionsLogical.h b/dbms/include/DB/Functions/FunctionsLogical.h index 6d9e1057167..7d8b7dd738b 100644 --- a/dbms/include/DB/Functions/FunctionsLogical.h +++ b/dbms/include/DB/Functions/FunctionsLogical.h @@ -91,7 +91,8 @@ struct AssociativeOperationImpl /// Выбрасывает N последних столбцов из in (если их меньше, то все) и кладет в result их комбинацию. static void execute(UInt8ColumnPtrs & in, UInt8Container & result) { - if (N > in.size()){ + if (N > in.size()) + { AssociativeOperationImpl::execute(in, result); return; } diff --git a/dbms/include/DB/Storages/MergeTree/MergeTreeBlockInputStream.h b/dbms/include/DB/Storages/MergeTree/MergeTreeBlockInputStream.h index e75ed3fa675..fc865bda387 100644 --- a/dbms/include/DB/Storages/MergeTree/MergeTreeBlockInputStream.h +++ b/dbms/include/DB/Storages/MergeTree/MergeTreeBlockInputStream.h @@ -28,7 +28,8 @@ public: { std::reverse(remaining_mark_ranges.begin(), remaining_mark_ranges.end()); - if (prewhere_actions){ + if (prewhere_actions) + { pre_column_names = prewhere_actions->getRequiredColumns(); if (pre_column_names.empty()) pre_column_names.push_back(column_names[0]); @@ -132,9 +133,9 @@ protected: return res; } - for (size_t i = 0; i < ranges_to_read.size(); ++i){ + for (size_t i = 0; i < ranges_to_read.size(); ++i) + { const MarkRange & range = ranges_to_read[i]; - reader->readRange(range.begin, range.end, res); } @@ -150,7 +151,8 @@ protected: /// Прочитаем в нужных отрезках остальные столбцы и составим для них свой фильтр. size_t pre_filter_pos = 0; size_t post_filter_pos = 0; - for (size_t i = 0; i < ranges_to_read.size(); ++i){ + for (size_t i = 0; i < ranges_to_read.size(); ++i) + { const MarkRange & range = ranges_to_read[i]; size_t begin = range.begin; diff --git a/dbms/include/DB/Storages/MergeTree/PKCondition.h b/dbms/include/DB/Storages/MergeTree/PKCondition.h index b3fc269bb9a..7b523768235 100644 --- a/dbms/include/DB/Storages/MergeTree/PKCondition.h +++ b/dbms/include/DB/Storages/MergeTree/PKCondition.h @@ -308,7 +308,7 @@ private: RPNElement(Function function_) : function(function_) {} RPNElement(Function function_, size_t key_column_) : function(function_), key_column(key_column_) {} RPNElement(Function function_, size_t key_column_, const Range & range_) - : function(function_), range(range_), key_column(key_column_){} + : function(function_), range(range_), key_column(key_column_) {} String toString(); diff --git a/dbms/src/Functions/tests/logical_functions_performance.cpp b/dbms/src/Functions/tests/logical_functions_performance.cpp index cf6bcb22237..740ddb3dd1a 100644 --- a/dbms/src/Functions/tests/logical_functions_performance.cpp +++ b/dbms/src/Functions/tests/logical_functions_performance.cpp @@ -44,7 +44,8 @@ struct AssociativeOperationImpl /// Выбрасывает N последних столбцов из in (если их меньше, то все) и кладет в result их комбинацию. static void execute(UInt8ColumnPtrs & in, UInt8Container & result) { - if (N > in.size()){ + if (N > in.size()) + { AssociativeOperationImpl::execute(in, result); return; } From aa3c0b6a8d4f5fe4ab18ffdda25b069ff146894f Mon Sep 17 00:00:00 2001 From: Alexey Milovidov Date: Wed, 23 Apr 2014 02:58:05 +0400 Subject: [PATCH 14/14] dbms: tiny improvement [#METR-10931]. --- .../DB/Storages/MergeTree/MergeTreeBlockInputStream.h | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/dbms/include/DB/Storages/MergeTree/MergeTreeBlockInputStream.h b/dbms/include/DB/Storages/MergeTree/MergeTreeBlockInputStream.h index fc865bda387..803b595bfe9 100644 --- a/dbms/include/DB/Storages/MergeTree/MergeTreeBlockInputStream.h +++ b/dbms/include/DB/Storages/MergeTree/MergeTreeBlockInputStream.h @@ -85,9 +85,9 @@ protected: if (!reader) { UncompressedCache * uncompressed_cache = use_uncompressed_cache ? storage.context.getUncompressedCache() : NULL; - reader = new MergeTreeReader(path, column_names, uncompressed_cache, storage); + reader.reset(new MergeTreeReader(path, column_names, uncompressed_cache, storage)); if (prewhere_actions) - pre_reader = new MergeTreeReader(path, pre_column_names, uncompressed_cache, storage); + pre_reader.reset(new MergeTreeReader(path, pre_column_names, uncompressed_cache, storage)); } if (prewhere_actions) @@ -245,7 +245,7 @@ protected: * Чтобы при создании многих источников, но одновременном чтении только из нескольких, * буферы не висели в памяти. */ - reader = nullptr; + reader.reset(); } return res; @@ -263,8 +263,8 @@ private: MarkRanges remaining_mark_ranges; /// В каких диапазонах засечек еще не прочли. /// В порядке убывания номеров, чтобы можно было выбрасывать из конца. bool use_uncompressed_cache; - Poco::SharedPtr reader; - Poco::SharedPtr pre_reader; + std::unique_ptr reader; + std::unique_ptr pre_reader; ExpressionActionsPtr prewhere_actions; String prewhere_column; bool remove_prewhere_column;