From 3a8ca70d1f54a24231af32a7ff8cfe435cfe2c82 Mon Sep 17 00:00:00 2001 From: Alexey Zatelepin Date: Fri, 6 Apr 2018 19:06:07 +0300 Subject: [PATCH 01/21] fix races in leader election [#CLICKHOUSE-3533] --- dbms/src/Common/ZooKeeper/LeaderElection.h | 22 +++--- .../ReplicatedMergeTreeCleanupThread.cpp | 2 +- .../ReplicatedMergeTreeRestartingThread.cpp | 49 +++--------- .../Storages/StorageReplicatedMergeTree.cpp | 74 ++++++++++++++----- .../src/Storages/StorageReplicatedMergeTree.h | 15 ++-- 5 files changed, 88 insertions(+), 74 deletions(-) diff --git a/dbms/src/Common/ZooKeeper/LeaderElection.h b/dbms/src/Common/ZooKeeper/LeaderElection.h index 8dd9b1831b1..1786cc76510 100644 --- a/dbms/src/Common/ZooKeeper/LeaderElection.h +++ b/dbms/src/Common/ZooKeeper/LeaderElection.h @@ -41,10 +41,15 @@ public: createNode(); } - void yield() + void shutdown() { - releaseNode(); - createNode(); + if (shutdown_called) + return; + + shutdown_called = true; + event->set(); + if (thread.joinable()) + thread.join(); } ~LeaderElection() @@ -62,14 +67,14 @@ private: std::string node_name; std::thread thread; - std::atomic shutdown {false}; + std::atomic shutdown_called {false}; zkutil::EventPtr event = std::make_shared(); CurrentMetrics::Increment metric_increment{CurrentMetrics::LeaderElection}; void createNode() { - shutdown = false; + shutdown_called = false; node = EphemeralNodeHolder::createSequential(path + "/leader_election-", zookeeper, identifier); std::string node_path = node->getPath(); @@ -80,16 +85,13 @@ private: void releaseNode() { - shutdown = true; - event->set(); - if (thread.joinable()) - thread.join(); + shutdown(); node = nullptr; } void threadFunction() { - while (!shutdown) + while (!shutdown_called) { bool success = false; diff --git a/dbms/src/Storages/MergeTree/ReplicatedMergeTreeCleanupThread.cpp b/dbms/src/Storages/MergeTree/ReplicatedMergeTreeCleanupThread.cpp index f7dca129bd3..fb3276fcfab 100644 --- a/dbms/src/Storages/MergeTree/ReplicatedMergeTreeCleanupThread.cpp +++ b/dbms/src/Storages/MergeTree/ReplicatedMergeTreeCleanupThread.cpp @@ -52,7 +52,7 @@ void ReplicatedMergeTreeCleanupThread::iterate() /// This is loose condition: no problem if we actually had lost leadership at this moment /// and two replicas will try to do cleanup simultaneously. - if (storage.is_leader_node) + if (storage.is_leader) { clearOldLogs(); clearOldBlocks(); diff --git a/dbms/src/Storages/MergeTree/ReplicatedMergeTreeRestartingThread.cpp b/dbms/src/Storages/MergeTree/ReplicatedMergeTreeRestartingThread.cpp index 6b20b5c86c1..1cd958c60d6 100644 --- a/dbms/src/Storages/MergeTree/ReplicatedMergeTreeRestartingThread.cpp +++ b/dbms/src/Storages/MergeTree/ReplicatedMergeTreeRestartingThread.cpp @@ -17,7 +17,6 @@ namespace ProfileEvents namespace CurrentMetrics { extern const Metric ReadonlyReplica; - extern const Metric LeaderReplica; } @@ -139,7 +138,7 @@ void ReplicatedMergeTreeRestartingThread::run() prev_time_of_check_delay = current_time; /// We give up leadership if the relative lag is greater than threshold. - if (storage.is_leader_node + if (storage.is_leader && relative_delay > static_cast(storage.data.settings.min_relative_delay_to_yield_leadership)) { LOG_INFO(log, "Relative replica delay (" << relative_delay << " seconds) is bigger than threshold (" @@ -147,11 +146,11 @@ void ReplicatedMergeTreeRestartingThread::run() ProfileEvents::increment(ProfileEvents::ReplicaYieldLeadership); - storage.is_leader_node = false; - CurrentMetrics::sub(CurrentMetrics::LeaderReplica); - if (storage.merge_selecting_thread.joinable()) - storage.merge_selecting_thread.join(); - storage.leader_election->yield(); + storage.exitLeaderElection(); + /// NOTE: enterLeaderElection() can throw if node creation in ZK fails. + /// This is bad because we can end up without a leader on any replica. + /// In this case we rely on the fact that the session will expire and we will reconnect. + storage.enterLeaderElection(); } } } @@ -169,6 +168,8 @@ void ReplicatedMergeTreeRestartingThread::run() storage.data_parts_exchange_endpoint_holder->cancelForever(); storage.data_parts_exchange_endpoint_holder = nullptr; + /// Cancel fetches and merges to force the queue_task to finish ASAP. + storage.fetcher.blocker.cancelForever(); storage.merger.merges_blocker.cancelForever(); partialShutdown(); @@ -195,12 +196,7 @@ bool ReplicatedMergeTreeRestartingThread::tryStartup() updateQuorumIfWeHavePart(); if (storage.data.settings.replicated_can_become_leader) - storage.leader_election = std::make_shared( - storage.zookeeper_path + "/leader_election", - *storage.current_zookeeper, /// current_zookeeper lives for the lifetime of leader_election, - /// since before changing `current_zookeeper`, `leader_election` object is destroyed in `partialShutdown` method. - [this] { storage.becomeLeader(); CurrentMetrics::add(CurrentMetrics::LeaderReplica); }, - storage.replica_name); + storage.enterLeaderElection(); /// Anything above can throw a KeeperException if something is wrong with ZK. /// Anything below should not throw exceptions. @@ -222,7 +218,6 @@ bool ReplicatedMergeTreeRestartingThread::tryStartup() catch (...) { storage.replica_is_active_node = nullptr; - storage.leader_election = nullptr; try { @@ -366,17 +361,9 @@ void ReplicatedMergeTreeRestartingThread::partialShutdown() storage.replica_is_active_node = nullptr; LOG_TRACE(log, "Waiting for threads to finish"); - { - std::lock_guard lock(storage.leader_node_mutex); - bool old_val = true; - if (storage.is_leader_node.compare_exchange_strong(old_val, false)) - { - CurrentMetrics::sub(CurrentMetrics::LeaderReplica); - if (storage.merge_selecting_thread.joinable()) - storage.merge_selecting_thread.join(); - } - } + storage.exitLeaderElection(); + if (storage.queue_updating_thread.joinable()) storage.queue_updating_thread.join(); @@ -384,20 +371,6 @@ void ReplicatedMergeTreeRestartingThread::partialShutdown() storage.alter_thread.reset(); storage.part_check_thread.stop(); - /// Yielding leadership only after finish of merge_selecting_thread. - /// Otherwise race condition with parallel run of merge selecting thread on different servers is possible. - /// - /// On the other hand, leader_election could call becomeLeader() from own thread after - /// merge_selecting_thread is finished and restarting_thread is destroyed. - /// becomeLeader() recreates merge_selecting_thread and it becomes joinable again, even restarting_thread is destroyed. - /// But restarting_thread is responsible to stop merge_selecting_thread. - /// It will lead to std::terminate in ~StorageReplicatedMergeTree(). - /// Such behaviour was rarely observed on DROP queries. - /// Therefore we need either avoid becoming leader after first shutdown call (more deliberate choice), - /// either manually wait merge_selecting_thread.join() inside ~StorageReplicatedMergeTree(), either or something third. - /// So, we added shutdown check in becomeLeader() and made its creation and deletion atomic. - storage.leader_election = nullptr; - LOG_TRACE(log, "Threads finished"); } diff --git a/dbms/src/Storages/StorageReplicatedMergeTree.cpp b/dbms/src/Storages/StorageReplicatedMergeTree.cpp index 39213653478..1548537e390 100644 --- a/dbms/src/Storages/StorageReplicatedMergeTree.cpp +++ b/dbms/src/Storages/StorageReplicatedMergeTree.cpp @@ -59,6 +59,12 @@ namespace ProfileEvents extern const Event DataAfterMergeDiffersFromReplica; } +namespace CurrentMetrics +{ + extern const Metric LeaderReplica; +} + + namespace DB { @@ -1883,7 +1889,7 @@ void StorageReplicatedMergeTree::mergeSelectingThread() && cached_merging_predicate.get(now, uncached_merging_predicate, merging_predicate_args_to_key, left, right); }; - while (!shutdown_called && is_leader_node) + while (is_leader) { bool success = false; @@ -1932,7 +1938,7 @@ void StorageReplicatedMergeTree::mergeSelectingThread() tryLogCurrentException(__PRETTY_FUNCTION__); } - if (shutdown_called || !is_leader_node) + if (!is_leader) break; if (!success) @@ -2037,23 +2043,55 @@ void StorageReplicatedMergeTree::removePartAndEnqueueFetch(const String & part_n } -void StorageReplicatedMergeTree::becomeLeader() +void StorageReplicatedMergeTree::enterLeaderElection() { - std::lock_guard lock(leader_node_mutex); + auto callback = [this]() + { + CurrentMetrics::add(CurrentMetrics::LeaderReplica); + LOG_INFO(log, "Became leader"); - if (shutdown_called) + is_leader = true; + merge_selecting_thread = std::thread(&StorageReplicatedMergeTree::mergeSelectingThread, this); + }; + + try + { + leader_election = std::make_shared( + zookeeper_path + "/leader_election", + *current_zookeeper, /// current_zookeeper lives for the lifetime of leader_election, + /// since before changing `current_zookeeper`, `leader_election` object is destroyed in `partialShutdown` method. + callback, + replica_name); + } + catch (...) + { + leader_election = nullptr; + throw; + } +} + +void StorageReplicatedMergeTree::exitLeaderElection() +{ + if (!leader_election) return; - if (merge_selecting_thread.joinable()) + /// Shut down the leader election thread to avoid suddenly becoming the leader again after + /// we have stopped the merge_selecting_thread, but before we have deleted the leader_election object. + leader_election->shutdown(); + + if (is_leader) { - LOG_INFO(log, "Deleting old leader"); - is_leader_node = false; /// exit trigger inside thread + CurrentMetrics::sub(CurrentMetrics::LeaderReplica); + LOG_INFO(log, "Stopped being leader"); + + is_leader = false; + merge_selecting_event.set(); merge_selecting_thread.join(); } - LOG_INFO(log, "Became leader"); - is_leader_node = true; - merge_selecting_thread = std::thread(&StorageReplicatedMergeTree::mergeSelectingThread, this); + /// Delete the node in ZK only after we have stopped the merge_selecting_thread - so that only one + /// replica assigns merges at any given time. + leader_election = nullptr; } @@ -2382,12 +2420,6 @@ void StorageReplicatedMergeTree::startup() void StorageReplicatedMergeTree::shutdown() { - /** This must be done before waiting for restarting_thread. - * Because restarting_thread will wait for finishing of tasks in background pool, - * and parts are fetched in that tasks. - */ - fetcher.blocker.cancelForever(); - if (restarting_thread) { restarting_thread->stop(); @@ -2399,6 +2431,8 @@ void StorageReplicatedMergeTree::shutdown() data_parts_exchange_endpoint_holder->cancelForever(); data_parts_exchange_endpoint_holder = nullptr; } + + fetcher.blocker.cancelForever(); } @@ -2487,7 +2521,7 @@ bool StorageReplicatedMergeTree::optimize(const ASTPtr & query, const ASTPtr & p { assertNotReadonly(); - if (!is_leader_node) + if (!is_leader) { sendRequestToLeaderReplica(query, context.getSettingsRef()); return true; @@ -2813,7 +2847,7 @@ void StorageReplicatedMergeTree::dropPartition(const ASTPtr & query, const ASTPt zkutil::ZooKeeperPtr zookeeper = getZooKeeper(); - if (!is_leader_node) + if (!is_leader) { sendRequestToLeaderReplica(query, context.getSettingsRef()); return; @@ -3171,7 +3205,7 @@ void StorageReplicatedMergeTree::getStatus(Status & res, bool with_zk_fields) { auto zookeeper = tryGetZooKeeper(); - res.is_leader = is_leader_node; + res.is_leader = is_leader; res.is_readonly = is_readonly; res.is_session_expired = !zookeeper || zookeeper->expired(); diff --git a/dbms/src/Storages/StorageReplicatedMergeTree.h b/dbms/src/Storages/StorageReplicatedMergeTree.h index c2b09a77bf1..5d0659f19f5 100644 --- a/dbms/src/Storages/StorageReplicatedMergeTree.h +++ b/dbms/src/Storages/StorageReplicatedMergeTree.h @@ -220,8 +220,8 @@ private: /** Is this replica "leading". The leader replica selects the parts to merge. */ - std::atomic_bool is_leader_node {false}; - std::mutex leader_node_mutex; + std::atomic is_leader {false}; + zkutil::LeaderElectionPtr leader_election; InterserverIOEndpointHolderPtr data_parts_exchange_endpoint_holder; @@ -239,7 +239,6 @@ private: DataPartsExchange::Fetcher fetcher; - zkutil::LeaderElectionPtr leader_election; /// When activated, replica is initialized and startup() method could exit Poco::Event startup_event; @@ -368,9 +367,15 @@ private: */ bool queueTask(); - /// Select the parts to merge. + /// Postcondition: + /// either leader_election is fully initialized (node in ZK is created and the watching thread is launched) + /// or an exception is thrown and leader_election is destroyed. + void enterLeaderElection(); - void becomeLeader(); + /// Postcondition: + /// is_leader is false, merge_selecting_thread is stopped, leader_election is nullptr. + /// leader_election node in ZK is either deleted, or the session is marked expired. + void exitLeaderElection(); /** Selects the parts to merge and writes to the log. */ From 47637c78857c64f5e7b1dadc741e13835954e9f5 Mon Sep 17 00:00:00 2001 From: Alexey Milovidov Date: Fri, 6 Apr 2018 19:26:38 +0300 Subject: [PATCH 02/21] Actualized test #1846 --- .../queries/0_stateless/00441_nulls_in.reference | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/dbms/tests/queries/0_stateless/00441_nulls_in.reference b/dbms/tests/queries/0_stateless/00441_nulls_in.reference index 591e55ae41a..81e812f596f 100644 --- a/dbms/tests/queries/0_stateless/00441_nulls_in.reference +++ b/dbms/tests/queries/0_stateless/00441_nulls_in.reference @@ -5,12 +5,12 @@ 0 0 1 -\N +0 1 0 0 1 -\N +0 1 0 0 @@ -27,7 +27,7 @@ 1 0 1 -\N +0 0 1 0 @@ -35,12 +35,12 @@ 0 0 1 -\N +0 1 0 0 1 -\N +0 1 0 0 @@ -57,7 +57,7 @@ 1 0 1 -\N +0 0 1 0 From af226d62f59ebbe9ecbe32ddc80a7aa7bab2fd59 Mon Sep 17 00:00:00 2001 From: Alexey Milovidov Date: Fri, 6 Apr 2018 21:09:20 +0300 Subject: [PATCH 03/21] Fixed totally wrong code in SummingMergeTree in the case of complex maps [#CLICKHOUSE-2] --- contrib/poco | 2 +- .../SummingSortedBlockInputStream.cpp | 17 ++++++++++++++--- .../00327_summing_composite_nested.sql | 7 +------ 3 files changed, 16 insertions(+), 10 deletions(-) diff --git a/contrib/poco b/contrib/poco index 930a7ec1154..a107b0c9cee 160000 --- a/contrib/poco +++ b/contrib/poco @@ -1 +1 @@ -Subproject commit 930a7ec1154f4f9711edfb4b4a39f9fff2a5bbb5 +Subproject commit a107b0c9cee109fe0abfbf509df3c78a1e0c05fa diff --git a/dbms/src/DataStreams/SummingSortedBlockInputStream.cpp b/dbms/src/DataStreams/SummingSortedBlockInputStream.cpp index eeda7f50e75..e79366ca02d 100644 --- a/dbms/src/DataStreams/SummingSortedBlockInputStream.cpp +++ b/dbms/src/DataStreams/SummingSortedBlockInputStream.cpp @@ -330,7 +330,20 @@ void SummingSortedBlockInputStream::merge(MutableColumns & merged_columns, std:: // Start aggregations with current row addRow(current); - current_row_is_zero = true; + + if (maps_to_sum.empty()) + { + /// We have only columns_to_aggregate. The status of current row will be determined + /// in 'insertCurrentRowIfNeeded' method on the values of aggregate functions. + current_row_is_zero = true; + } + else + { + /// We have complex maps that will be summed with 'mergeMap' method. + /// The single row is considered non zero, and the status after merging with other rows + /// will be determined in the branch below (when key_differs == false). + current_row_is_zero = false; + } } else { @@ -338,10 +351,8 @@ void SummingSortedBlockInputStream::merge(MutableColumns & merged_columns, std:: // Merge maps only for same rows for (const auto & desc : maps_to_sum) - { if (mergeMap(desc, current_row, current)) current_row_is_zero = false; - } } if (!current->isLast()) diff --git a/dbms/tests/queries/0_stateless/00327_summing_composite_nested.sql b/dbms/tests/queries/0_stateless/00327_summing_composite_nested.sql index 43b37616941..e21389528e4 100644 --- a/dbms/tests/queries/0_stateless/00327_summing_composite_nested.sql +++ b/dbms/tests/queries/0_stateless/00327_summing_composite_nested.sql @@ -1,12 +1,7 @@ DROP TABLE IF EXISTS test.summing_composite_key; CREATE TABLE test.summing_composite_key (d Date, k UInt64, FirstMap Nested(k1 UInt32, k2ID Int8, s Float64), SecondMap Nested(k1ID UInt64, k2Key UInt32, k3Type Int32, s Int64)) ENGINE = SummingMergeTree(d, k, 1); -INSERT INTO test.summing_composite_key VALUES ('2000-01-01', 1, [1,2], [3,4], [10,11], [0,1,2], [3,4,5], [-1,-2,-3], [1,10,100]); -INSERT INTO test.summing_composite_key VALUES ('2000-01-01', 1, [2,1], [4,3], [20,22], [2,2,1], [5,5,0], [-3,-3,-33], [10,100,1000]); - -INSERT INTO test.summing_composite_key VALUES ('2000-01-01', 2, [1,2], [3,4], [10,11], [0,1,2], [3,4,5], [-1,-2,-3], [1,10,100]); -INSERT INTO test.summing_composite_key VALUES ('2000-01-01', 2, [2,1,1], [4,3,3], [20,22,33], [2,2], [5,5], [-3,-3], [10,100]); -INSERT INTO test.summing_composite_key VALUES ('2000-01-01', 2, [1,2], [3,4], [10,11], [0,1,2], [3,4,5], [-1,-2,-3], [1,10,100]); +INSERT INTO test.summing_composite_key VALUES ('2000-01-01', 1, [1,2], [3,4], [10,11], [0,1,2], [3,4,5], [-1,-2,-3], [1,10,100]), ('2000-01-01', 1, [2,1], [4,3], [20,22], [2,2,1], [5,5,0], [-3,-3,-33], [10,100,1000]), ('2000-01-01', 2, [1,2], [3,4], [10,11], [0,1,2], [3,4,5], [-1,-2,-3], [1,10,100]), ('2000-01-01', 2, [2,1,1], [4,3,3], [20,22,33], [2,2], [5,5], [-3,-3], [10,100]), ('2000-01-01', 2, [1,2], [3,4], [10,11], [0,1,2], [3,4,5], [-1,-2,-3], [1,10,100]); SELECT * FROM test.summing_composite_key ORDER BY d, k, _part_index; From a37c5dd60660b1c8a9f4cafb19595b613f8a43f3 Mon Sep 17 00:00:00 2001 From: proller Date: Fri, 6 Apr 2018 21:58:26 +0300 Subject: [PATCH 04/21] revert reverted submodule contrib/poco --- contrib/poco | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/contrib/poco b/contrib/poco index a107b0c9cee..930a7ec1154 160000 --- a/contrib/poco +++ b/contrib/poco @@ -1 +1 @@ -Subproject commit a107b0c9cee109fe0abfbf509df3c78a1e0c05fa +Subproject commit 930a7ec1154f4f9711edfb4b4a39f9fff2a5bbb5 From fa776b93c2f541d95fd20c210acac2b3f733cb47 Mon Sep 17 00:00:00 2001 From: Alexey Milovidov Date: Fri, 6 Apr 2018 22:43:37 +0300 Subject: [PATCH 05/21] Avoid herd effect in ReplicatedMergeTreeCleanupThread [#CLICKHOUSE-2] --- dbms/src/Storages/MergeTree/MergeTreeSettings.h | 3 +++ .../Storages/MergeTree/ReplicatedMergeTreeCleanupThread.cpp | 5 ++++- .../Storages/MergeTree/ReplicatedMergeTreeCleanupThread.h | 3 +++ 3 files changed, 10 insertions(+), 1 deletion(-) diff --git a/dbms/src/Storages/MergeTree/MergeTreeSettings.h b/dbms/src/Storages/MergeTree/MergeTreeSettings.h index 424c634afaf..aa29dccc195 100644 --- a/dbms/src/Storages/MergeTree/MergeTreeSettings.h +++ b/dbms/src/Storages/MergeTree/MergeTreeSettings.h @@ -104,6 +104,9 @@ struct MergeTreeSettings \ /** Period to clean old queue logs, blocks hashes and parts */ \ M(SettingUInt64, cleanup_delay_period, 30) \ + /** Add uniformly distributed value from 0 to x seconds to cleanup_delay_period \ + to avoid thundering herd effect and subsequent DoS of ZooKeeper in case of very large number of tables */ \ + M(SettingUInt64, cleanup_delay_period_random_add, 10) \ \ /** Minimal delay from other replicas to yield leadership. Here and further 0 means unlimited. */ \ M(SettingUInt64, min_relative_delay_to_yield_leadership, 120) \ diff --git a/dbms/src/Storages/MergeTree/ReplicatedMergeTreeCleanupThread.cpp b/dbms/src/Storages/MergeTree/ReplicatedMergeTreeCleanupThread.cpp index f7dca129bd3..946a88cbe0e 100644 --- a/dbms/src/Storages/MergeTree/ReplicatedMergeTreeCleanupThread.cpp +++ b/dbms/src/Storages/MergeTree/ReplicatedMergeTreeCleanupThread.cpp @@ -3,6 +3,8 @@ #include #include +#include + namespace DB { @@ -25,7 +27,8 @@ void ReplicatedMergeTreeCleanupThread::run() { setThreadName("ReplMTCleanup"); - const auto CLEANUP_SLEEP_MS = storage.data.settings.cleanup_delay_period * 1000; + const auto CLEANUP_SLEEP_MS = (storage.data.settings.cleanup_delay_period + + std::uniform_int_distribution(0, storage.data.settings.cleanup_delay_period_random_add)(rng)) * 1000; while (!storage.shutdown_called) { diff --git a/dbms/src/Storages/MergeTree/ReplicatedMergeTreeCleanupThread.h b/dbms/src/Storages/MergeTree/ReplicatedMergeTreeCleanupThread.h index c717e84bfd5..ccbb564fa96 100644 --- a/dbms/src/Storages/MergeTree/ReplicatedMergeTreeCleanupThread.h +++ b/dbms/src/Storages/MergeTree/ReplicatedMergeTreeCleanupThread.h @@ -7,6 +7,8 @@ #include #include +#include + namespace DB { @@ -27,6 +29,7 @@ private: StorageReplicatedMergeTree & storage; Logger * log; std::thread thread; + pcg64 rng; void run(); void iterate(); From ee7fe63b69aedbf6d13f5d9a7984a272dfb3262b Mon Sep 17 00:00:00 2001 From: Alexey Milovidov Date: Fri, 6 Apr 2018 22:44:55 +0300 Subject: [PATCH 06/21] Avoid herd effect in ReplicatedMergeTreeCleanupThread (continued) [#CLICKHOUSE-2] --- contrib/poco | 2 +- .../Storages/MergeTree/ReplicatedMergeTreeCleanupThread.cpp | 4 ++-- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/contrib/poco b/contrib/poco index 930a7ec1154..a107b0c9cee 160000 --- a/contrib/poco +++ b/contrib/poco @@ -1 +1 @@ -Subproject commit 930a7ec1154f4f9711edfb4b4a39f9fff2a5bbb5 +Subproject commit a107b0c9cee109fe0abfbf509df3c78a1e0c05fa diff --git a/dbms/src/Storages/MergeTree/ReplicatedMergeTreeCleanupThread.cpp b/dbms/src/Storages/MergeTree/ReplicatedMergeTreeCleanupThread.cpp index 946a88cbe0e..f61fc0d02ec 100644 --- a/dbms/src/Storages/MergeTree/ReplicatedMergeTreeCleanupThread.cpp +++ b/dbms/src/Storages/MergeTree/ReplicatedMergeTreeCleanupThread.cpp @@ -27,8 +27,8 @@ void ReplicatedMergeTreeCleanupThread::run() { setThreadName("ReplMTCleanup"); - const auto CLEANUP_SLEEP_MS = (storage.data.settings.cleanup_delay_period - + std::uniform_int_distribution(0, storage.data.settings.cleanup_delay_period_random_add)(rng)) * 1000; + const auto CLEANUP_SLEEP_MS = storage.data.settings.cleanup_delay_period * 1000 + + std::uniform_int_distribution(0, storage.data.settings.cleanup_delay_period_random_add * 1000)(rng); while (!storage.shutdown_called) { From 0d8d99e7a44a4bc4818e59ab070c59775d5e816d Mon Sep 17 00:00:00 2001 From: Alexey Milovidov Date: Fri, 6 Apr 2018 22:48:54 +0300 Subject: [PATCH 07/21] Fixed typo [#CLICKHOUSE-2] --- dbms/src/Storages/StorageReplicatedMergeTree.cpp | 10 +++++----- dbms/src/Storages/StorageReplicatedMergeTree.h | 2 +- 2 files changed, 6 insertions(+), 6 deletions(-) diff --git a/dbms/src/Storages/StorageReplicatedMergeTree.cpp b/dbms/src/Storages/StorageReplicatedMergeTree.cpp index 39213653478..666a77e70fb 100644 --- a/dbms/src/Storages/StorageReplicatedMergeTree.cpp +++ b/dbms/src/Storages/StorageReplicatedMergeTree.cpp @@ -3637,7 +3637,7 @@ void StorageReplicatedMergeTree::clearOldPartsAndRemoveFromZK() void StorageReplicatedMergeTree::removePartsFromZooKeeper(zkutil::ZooKeeperPtr & zookeeper, const Strings & part_names, - NameSet * parts_should_be_retied) + NameSet * parts_should_be_retried) { zkutil::Requests ops; auto it_first_node_in_batch = part_names.cbegin(); @@ -3668,9 +3668,9 @@ void StorageReplicatedMergeTree::removePartsFromZooKeeper(zkutil::ZooKeeperPtr & { LOG_DEBUG(log, "There is no part " << *it_in_batch << " in ZooKeeper, it was only in filesystem"); } - else if (parts_should_be_retied && zkutil::isHardwareError(cur_code)) + else if (parts_should_be_retried && zkutil::isHardwareError(cur_code)) { - parts_should_be_retied->emplace(*it_in_batch); + parts_should_be_retried->emplace(*it_in_batch); } else if (cur_code) { @@ -3678,10 +3678,10 @@ void StorageReplicatedMergeTree::removePartsFromZooKeeper(zkutil::ZooKeeperPtr & } } } - else if (parts_should_be_retied && zkutil::isHardwareError(code)) + else if (parts_should_be_retried && zkutil::isHardwareError(code)) { for (auto it_in_batch = it_first_node_in_batch; it_in_batch != it_next; ++it_in_batch) - parts_should_be_retied->emplace(*it_in_batch); + parts_should_be_retried->emplace(*it_in_batch); } else if (code) { diff --git a/dbms/src/Storages/StorageReplicatedMergeTree.h b/dbms/src/Storages/StorageReplicatedMergeTree.h index c2b09a77bf1..32e204cac21 100644 --- a/dbms/src/Storages/StorageReplicatedMergeTree.h +++ b/dbms/src/Storages/StorageReplicatedMergeTree.h @@ -334,7 +334,7 @@ private: /// Quickly removes big set of parts from ZooKeeper (using async multi queries) void removePartsFromZooKeeper(zkutil::ZooKeeperPtr & zookeeper, const Strings & part_names, - NameSet * parts_should_be_retied = nullptr); + NameSet * parts_should_be_retried = nullptr); /// Removes a part from ZooKeeper and adds a task to the queue to download it. It is supposed to do this with broken parts. void removePartAndEnqueueFetch(const String & part_name); From 708dc5ef82e6d4a54162d1ec7671434801deedb4 Mon Sep 17 00:00:00 2001 From: Alexey Milovidov Date: Fri, 6 Apr 2018 22:54:18 +0300 Subject: [PATCH 08/21] ZooKeeper: fixed error [#CLICKHOUSE-2] --- dbms/src/Common/ZooKeeper/ZooKeeperImpl.cpp | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/dbms/src/Common/ZooKeeper/ZooKeeperImpl.cpp b/dbms/src/Common/ZooKeeper/ZooKeeperImpl.cpp index 7b105ddec5a..37ff3078d75 100644 --- a/dbms/src/Common/ZooKeeper/ZooKeeperImpl.cpp +++ b/dbms/src/Common/ZooKeeper/ZooKeeperImpl.cpp @@ -714,8 +714,8 @@ void ZooKeeper::sendThread() RequestInfo info; if (requests_queue.tryPop(info, max_wait)) { - if (expired) - break; + /// After we popped element from the queue, we must register callbacks (even in the case when expired == true right now), + /// because they must not be lost (callbacks must be called because the user will wait for them). if (info.request->xid != close_xid) { @@ -732,6 +732,9 @@ void ZooKeeper::sendThread() watches[info.request->getPath()].emplace_back(std::move(info.watch)); } + if (expired) + break; + info.request->write(*out); if (info.request->xid == close_xid) From 9b3169a331470ee3ed2558be75e4d56fcf46827c Mon Sep 17 00:00:00 2001 From: Alexey Milovidov Date: Fri, 6 Apr 2018 23:03:35 +0300 Subject: [PATCH 09/21] Fixed build with clang 5 (although it is Ok on clang 6) [#CLICKHOUSE-2] --- dbms/src/Common/tests/dump_variable.cpp | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/dbms/src/Common/tests/dump_variable.cpp b/dbms/src/Common/tests/dump_variable.cpp index d7fcdb4bdb7..3213435ab17 100644 --- a/dbms/src/Common/tests/dump_variable.cpp +++ b/dbms/src/Common/tests/dump_variable.cpp @@ -37,7 +37,7 @@ int main(int, char **) std::initializer_list list{"hello", "world"}; DUMP(list); - std::array arr{"hello", "world"}; + std::array arr{{"hello", "world"}}; DUMP(arr); //DUMP([]{}); From 55934058c603339403764b236eacd5431c041739 Mon Sep 17 00:00:00 2001 From: proller Date: Fri, 6 Apr 2018 23:22:33 +0300 Subject: [PATCH 10/21] revert reverted submodule contrib/poco --- contrib/poco | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/contrib/poco b/contrib/poco index a107b0c9cee..930a7ec1154 160000 --- a/contrib/poco +++ b/contrib/poco @@ -1 +1 @@ -Subproject commit a107b0c9cee109fe0abfbf509df3c78a1e0c05fa +Subproject commit 930a7ec1154f4f9711edfb4b4a39f9fff2a5bbb5 From f1f1f095005cb479c57db5e120d741e2f8f8cace Mon Sep 17 00:00:00 2001 From: proller Date: Fri, 6 Apr 2018 13:49:42 -0700 Subject: [PATCH 11/21] Debian: temporary keep packages with old names (#2186) * cmake fixes * Fix test server config * Debian: temporary keep packages with old names * fix * fix * Debian postinst: adjustable user --- debian/clickhouse-client.postinst | 4 +++- debian/clickhouse-server-base.cron.d | 1 + debian/clickhouse-server-base.install | 11 +++++++++++ debian/clickhouse-server-base.postinst | 1 + debian/clickhouse-server-base.preinst | 1 + debian/clickhouse-server-base.prerm | 1 + debian/clickhouse-server-base.service | 1 + debian/clickhouse-server-common.install | 2 ++ debian/clickhouse-server.postinst | 8 ++++---- debian/control | 14 ++++++++++++++ debian/rules | 5 +++++ release | 2 +- 12 files changed, 45 insertions(+), 6 deletions(-) create mode 120000 debian/clickhouse-server-base.cron.d create mode 100644 debian/clickhouse-server-base.install create mode 120000 debian/clickhouse-server-base.postinst create mode 120000 debian/clickhouse-server-base.preinst create mode 120000 debian/clickhouse-server-base.prerm create mode 120000 debian/clickhouse-server-base.service create mode 100644 debian/clickhouse-server-common.install diff --git a/debian/clickhouse-client.postinst b/debian/clickhouse-client.postinst index 355676990b6..ff54e3a58fc 100644 --- a/debian/clickhouse-client.postinst +++ b/debian/clickhouse-client.postinst @@ -1,7 +1,9 @@ #!/bin/sh set -e +CLICKHOUSE_USER=${CLICKHOUSE_USER=clickhouse} + mkdir -p /etc/clickhouse-client/conf.d # user created by clickhouse-server package -chown -R clickhouse /etc/clickhouse-client || true +chown -R ${CLICKHOUSE_USER} /etc/clickhouse-client || true diff --git a/debian/clickhouse-server-base.cron.d b/debian/clickhouse-server-base.cron.d new file mode 120000 index 00000000000..23e744386dd --- /dev/null +++ b/debian/clickhouse-server-base.cron.d @@ -0,0 +1 @@ +clickhouse-server.cron.d \ No newline at end of file diff --git a/debian/clickhouse-server-base.install b/debian/clickhouse-server-base.install new file mode 100644 index 00000000000..971955da925 --- /dev/null +++ b/debian/clickhouse-server-base.install @@ -0,0 +1,11 @@ +usr/bin/clickhouse +usr/bin/clickhouse-server +usr/bin/clickhouse-clang +usr/bin/clickhouse-lld +usr/bin/clickhouse-copier +usr/bin/clickhouse-report +etc/systemd/system/clickhouse-server.service +etc/init.d/clickhouse-server +etc/cron.d/clickhouse-server +usr/share/clickhouse/* +etc/security/limits.d/clickhouse.conf diff --git a/debian/clickhouse-server-base.postinst b/debian/clickhouse-server-base.postinst new file mode 120000 index 00000000000..42fbd368922 --- /dev/null +++ b/debian/clickhouse-server-base.postinst @@ -0,0 +1 @@ +clickhouse-server.postinst \ No newline at end of file diff --git a/debian/clickhouse-server-base.preinst b/debian/clickhouse-server-base.preinst new file mode 120000 index 00000000000..dbc74e163bf --- /dev/null +++ b/debian/clickhouse-server-base.preinst @@ -0,0 +1 @@ +clickhouse-server.preinst \ No newline at end of file diff --git a/debian/clickhouse-server-base.prerm b/debian/clickhouse-server-base.prerm new file mode 120000 index 00000000000..03f62e02475 --- /dev/null +++ b/debian/clickhouse-server-base.prerm @@ -0,0 +1 @@ +clickhouse-server.prerm \ No newline at end of file diff --git a/debian/clickhouse-server-base.service b/debian/clickhouse-server-base.service new file mode 120000 index 00000000000..b00af30916c --- /dev/null +++ b/debian/clickhouse-server-base.service @@ -0,0 +1 @@ +clickhouse-server.service \ No newline at end of file diff --git a/debian/clickhouse-server-common.install b/debian/clickhouse-server-common.install new file mode 100644 index 00000000000..7237e9914d5 --- /dev/null +++ b/debian/clickhouse-server-common.install @@ -0,0 +1,2 @@ +etc/clickhouse-server/config.xml etc/clickhouse-server +etc/clickhouse-server/users.xml etc/clickhouse-server diff --git a/debian/clickhouse-server.postinst b/debian/clickhouse-server.postinst index 6946f8b2728..3476ef665ef 100644 --- a/debian/clickhouse-server.postinst +++ b/debian/clickhouse-server.postinst @@ -1,10 +1,10 @@ #!/bin/sh set -e -CLICKHOUSE_USER=clickhouse -CLICKHOUSE_GROUP=${CLICKHOUSE_USER} -CLICKHOUSE_DATADIR=/var/lib/clickhouse -CLICKHOUSE_LOGDIR=/var/log/clickhouse-server +CLICKHOUSE_USER=${CLICKHOUSE_USER=clickhouse} +CLICKHOUSE_GROUP=${CLICKHOUSE_GROUP=${CLICKHOUSE_USER}} +CLICKHOUSE_DATADIR=${CLICKHOUSE_DATADIR=/var/lib/clickhouse} +CLICKHOUSE_LOGDIR=${CLICKHOUSE_LOGDIR=/var/log/clickhouse-server} test -f /etc/default/clickhouse && . /etc/default/clickhouse diff --git a/debian/control b/debian/control index 8f57ae258f4..1b3f4656ecb 100644 --- a/debian/control +++ b/debian/control @@ -66,3 +66,17 @@ Priority: extra Architecture: any Depends: ${shlibs:Depends}, ${misc:Depends}, clickhouse-client, bash, expect, python, python-lxml, python-termcolor, curl, perl, sudo, openssl Description: Clickhouse tests + + +# TODO: Remove: + +Package: clickhouse-server-base +Architecture: any +Depends: ${shlibs:Depends}, ${misc:Depends}, adduser, tzdata +Description: DEPRECATED PACKAGE: Server binary for clickhouse + + +Package: clickhouse-server-common +Architecture: any +Depends: ${shlibs:Depends}, ${misc:Depends}, clickhouse-server-base (= ${binary:Version}) +Description: DEPRECATED PACKAGE: Common configuration files for clickhouse-server-base package diff --git a/debian/rules b/debian/rules index fb21adf9984..245a148ff89 100755 --- a/debian/rules +++ b/debian/rules @@ -97,6 +97,11 @@ override_dh_install: touch $(DESTDIR)/etc/clickhouse-server/metrika/config.xml touch $(DESTDIR)/etc/clickhouse-server/metrika/users.xml + # todo: remove after removing clickhouse-server-base package: + mkdir -p $(DESTDIR)/etc/init.d $(DESTDIR)/etc/cron.d + cp debian/clickhouse-server.init $(DESTDIR)/etc/init.d/clickhouse-server + cp debian/clickhouse-server.cron.d $(DESTDIR)/etc/cron.d/clickhouse-server + dh_install --list-missing --sourcedir=$(DESTDIR) override_dh_auto_install: diff --git a/release b/release index ae19a8ada46..e2ff2579dde 100755 --- a/release +++ b/release @@ -43,7 +43,7 @@ do shift elif [[ $1 == '--fast' ]]; then # Wrong but fast pbuilder mode: create base package with all depends - EXTRAPACKAGES="$EXTRAPACKAGES debhelper cmake gcc-7 g++-7 libc6-dev libmariadbclient-dev libicu-dev libltdl-dev libreadline-dev libssl-dev unixodbc-dev psmisc bash expect python python-lxml python-termcolor curl perl sudo openssl" + EXTRAPACKAGES="$EXTRAPACKAGES debhelper cmake ninja-build gcc-7 g++-7 libc6-dev libmariadbclient-dev libicu-dev libltdl-dev libreadline-dev libssl-dev unixodbc-dev psmisc bash expect python python-lxml python-termcolor curl perl sudo openssl" shift else echo "Unknown option $1" From 984d7044ac5e928741ca8a478700f015f44cc1f8 Mon Sep 17 00:00:00 2001 From: robot-metrika-test Date: Fri, 6 Apr 2018 23:52:52 +0300 Subject: [PATCH 12/21] Auto version update to [54375] --- dbms/cmake/version.cmake | 4 ++-- debian/changelog | 4 ++-- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/dbms/cmake/version.cmake b/dbms/cmake/version.cmake index 62d9d47d8d8..eeaff159b27 100644 --- a/dbms/cmake/version.cmake +++ b/dbms/cmake/version.cmake @@ -1,6 +1,6 @@ # This strings autochanged from release_lib.sh: -set(VERSION_DESCRIBE v1.1.54374-testing) -set(VERSION_REVISION 54374) +set(VERSION_DESCRIBE v1.1.54375-testing) +set(VERSION_REVISION 54375) # end of autochange set (VERSION_MAJOR 1) diff --git a/debian/changelog b/debian/changelog index 68694a3522d..675f47672a2 100644 --- a/debian/changelog +++ b/debian/changelog @@ -1,5 +1,5 @@ -clickhouse (1.1.54374) unstable; urgency=low +clickhouse (1.1.54375) unstable; urgency=low * Modified source code - -- Thu, 05 Apr 2018 21:26:54 +0300 + -- Fri, 06 Apr 2018 23:52:52 +0300 From 4e5e0fa664188a0f36e72df5b34c19d6c4dcfc4a Mon Sep 17 00:00:00 2001 From: Alexey Milovidov Date: Sat, 7 Apr 2018 00:46:57 +0300 Subject: [PATCH 13/21] ReplicatedMergeTree: Better diagnostics [#CLICKHOUSE-2] --- .../MergeTree/ReplicatedMergeTreeAlterThread.cpp | 2 +- .../MergeTree/ReplicatedMergeTreeCleanupThread.cpp | 2 +- .../ReplicatedMergeTreePartCheckThread.cpp | 4 ++-- .../ReplicatedMergeTreeRestartingThread.cpp | 6 +++--- dbms/src/Storages/StorageReplicatedMergeTree.cpp | 14 +++++++------- 5 files changed, 14 insertions(+), 14 deletions(-) diff --git a/dbms/src/Storages/MergeTree/ReplicatedMergeTreeAlterThread.cpp b/dbms/src/Storages/MergeTree/ReplicatedMergeTreeAlterThread.cpp index 3b9099f23eb..bc6f58f698a 100644 --- a/dbms/src/Storages/MergeTree/ReplicatedMergeTreeAlterThread.cpp +++ b/dbms/src/Storages/MergeTree/ReplicatedMergeTreeAlterThread.cpp @@ -190,7 +190,7 @@ void ReplicatedMergeTreeAlterThread::run() } catch (...) { - tryLogCurrentException(__PRETTY_FUNCTION__); + tryLogCurrentException(log, __PRETTY_FUNCTION__); force_recheck_parts = true; diff --git a/dbms/src/Storages/MergeTree/ReplicatedMergeTreeCleanupThread.cpp b/dbms/src/Storages/MergeTree/ReplicatedMergeTreeCleanupThread.cpp index f61fc0d02ec..e0d0781584f 100644 --- a/dbms/src/Storages/MergeTree/ReplicatedMergeTreeCleanupThread.cpp +++ b/dbms/src/Storages/MergeTree/ReplicatedMergeTreeCleanupThread.cpp @@ -38,7 +38,7 @@ void ReplicatedMergeTreeCleanupThread::run() } catch (...) { - tryLogCurrentException(__PRETTY_FUNCTION__); + tryLogCurrentException(log, __PRETTY_FUNCTION__); } storage.cleanup_thread_event.tryWait(CLEANUP_SLEEP_MS); diff --git a/dbms/src/Storages/MergeTree/ReplicatedMergeTreePartCheckThread.cpp b/dbms/src/Storages/MergeTree/ReplicatedMergeTreePartCheckThread.cpp index 85e58f4551b..6dbf462952a 100644 --- a/dbms/src/Storages/MergeTree/ReplicatedMergeTreePartCheckThread.cpp +++ b/dbms/src/Storages/MergeTree/ReplicatedMergeTreePartCheckThread.cpp @@ -265,7 +265,7 @@ void ReplicatedMergeTreePartCheckThread::checkPart(const String & part_name) { /// TODO Better to check error code. - tryLogCurrentException(__PRETTY_FUNCTION__); + tryLogCurrentException(log, __PRETTY_FUNCTION__); LOG_ERROR(log, "Part " << part_name << " looks broken. Removing it and queueing a fetch."); ProfileEvents::increment(ProfileEvents::ReplicatedPartChecksFailed); @@ -383,7 +383,7 @@ void ReplicatedMergeTreePartCheckThread::run() } catch (...) { - tryLogCurrentException(__PRETTY_FUNCTION__); + tryLogCurrentException(log, __PRETTY_FUNCTION__); wakeup_event.tryWait(PART_CHECK_ERROR_SLEEP_MS); } } diff --git a/dbms/src/Storages/MergeTree/ReplicatedMergeTreeRestartingThread.cpp b/dbms/src/Storages/MergeTree/ReplicatedMergeTreeRestartingThread.cpp index 6b20b5c86c1..e71fbca2ef9 100644 --- a/dbms/src/Storages/MergeTree/ReplicatedMergeTreeRestartingThread.cpp +++ b/dbms/src/Storages/MergeTree/ReplicatedMergeTreeRestartingThread.cpp @@ -93,7 +93,7 @@ void ReplicatedMergeTreeRestartingThread::run() catch (const zkutil::KeeperException & e) { /// The exception when you try to zookeeper_init usually happens if DNS does not work. We will try to do it again. - tryLogCurrentException(__PRETTY_FUNCTION__); + tryLogCurrentException(log, __PRETTY_FUNCTION__); if (first_time) storage.startup_event.set(); @@ -158,7 +158,7 @@ void ReplicatedMergeTreeRestartingThread::run() catch (...) { storage.startup_event.set(); - tryLogCurrentException(__PRETTY_FUNCTION__); + tryLogCurrentException(log, __PRETTY_FUNCTION__); } wakeup_event.tryWait(check_period_ms); @@ -179,7 +179,7 @@ void ReplicatedMergeTreeRestartingThread::run() } catch (...) { - tryLogCurrentException(__PRETTY_FUNCTION__); + tryLogCurrentException(log, __PRETTY_FUNCTION__); } LOG_DEBUG(log, "Restarting thread finished"); diff --git a/dbms/src/Storages/StorageReplicatedMergeTree.cpp b/dbms/src/Storages/StorageReplicatedMergeTree.cpp index 666a77e70fb..c0d135d1882 100644 --- a/dbms/src/Storages/StorageReplicatedMergeTree.cpp +++ b/dbms/src/Storages/StorageReplicatedMergeTree.cpp @@ -234,7 +234,7 @@ StorageReplicatedMergeTree::StorageReplicatedMergeTree( /// Failed to connect to ZK (this became known when trying to perform the first operation). if (e.code == ZooKeeperImpl::ZooKeeper::ZCONNECTIONLOSS) { - tryLogCurrentException(__PRETTY_FUNCTION__); + tryLogCurrentException(log, __PRETTY_FUNCTION__); current_zookeeper = nullptr; } else @@ -1468,7 +1468,7 @@ bool StorageReplicatedMergeTree::executeFetch(const StorageReplicatedMergeTree:: } catch (...) { - tryLogCurrentException(__PRETTY_FUNCTION__); + tryLogCurrentException(log, __PRETTY_FUNCTION__); } throw; @@ -1606,7 +1606,7 @@ void StorageReplicatedMergeTree::queueUpdatingThread() } catch (...) { - tryLogCurrentException(__PRETTY_FUNCTION__); + tryLogCurrentException(log, __PRETTY_FUNCTION__); queue_updating_event->tryWait(QUEUE_UPDATE_ERROR_SLEEP_MS); } } @@ -1626,7 +1626,7 @@ bool StorageReplicatedMergeTree::queueTask() } catch (...) { - tryLogCurrentException(__PRETTY_FUNCTION__); + tryLogCurrentException(log, __PRETTY_FUNCTION__); } LogEntryPtr & entry = selected.first; @@ -1660,7 +1660,7 @@ bool StorageReplicatedMergeTree::queueTask() LOG_INFO(log, e.displayText()); } else - tryLogCurrentException(__PRETTY_FUNCTION__); + tryLogCurrentException(log, __PRETTY_FUNCTION__); /** This exception will be written to the queue element, and it can be looked up using `system.replication_queue` table. * The thread that performs this action will sleep a few seconds after the exception. @@ -1670,7 +1670,7 @@ bool StorageReplicatedMergeTree::queueTask() } catch (...) { - tryLogCurrentException(__PRETTY_FUNCTION__); + tryLogCurrentException(log, __PRETTY_FUNCTION__); throw; } }); @@ -1929,7 +1929,7 @@ void StorageReplicatedMergeTree::mergeSelectingThread() } catch (...) { - tryLogCurrentException(__PRETTY_FUNCTION__); + tryLogCurrentException(log, __PRETTY_FUNCTION__); } if (shutdown_called || !is_leader_node) From 3fd2773151e5097ca7ce33143c215be046f816bb Mon Sep 17 00:00:00 2001 From: Alexey Milovidov Date: Sat, 7 Apr 2018 04:46:50 +0300 Subject: [PATCH 14/21] Improved code after introduction of method "getHeader" in every stream [#CLICKHOUSE-2] --- .../AggregatingSortedBlockInputStream.cpp | 68 ++-- .../AggregatingSortedBlockInputStream.h | 6 +- .../CollapsingSortedBlockInputStream.cpp | 9 +- .../CollapsingSortedBlockInputStream.h | 7 +- .../GraphiteRollupSortedBlockInputStream.cpp | 50 +-- .../GraphiteRollupSortedBlockInputStream.h | 8 +- .../MergeSortingBlockInputStream.cpp | 19 +- .../MergeSortingBlockInputStream.h | 7 +- .../MergingSortedBlockInputStream.cpp | 39 +-- .../MergingSortedBlockInputStream.h | 12 +- .../ReplacingSortedBlockInputStream.cpp | 11 +- .../ReplacingSortedBlockInputStream.h | 11 +- .../SummingSortedBlockInputStream.cpp | 324 +++++++++--------- .../SummingSortedBlockInputStream.h | 9 +- ...sionedCollapsingSortedBlockInputStream.cpp | 26 +- ...ersionedCollapsingSortedBlockInputStream.h | 16 +- 16 files changed, 291 insertions(+), 331 deletions(-) diff --git a/dbms/src/DataStreams/AggregatingSortedBlockInputStream.cpp b/dbms/src/DataStreams/AggregatingSortedBlockInputStream.cpp index 0f27cfdb2ca..7b431e206e9 100644 --- a/dbms/src/DataStreams/AggregatingSortedBlockInputStream.cpp +++ b/dbms/src/DataStreams/AggregatingSortedBlockInputStream.cpp @@ -12,15 +12,46 @@ namespace ErrorCodes } +AggregatingSortedBlockInputStream::AggregatingSortedBlockInputStream( + const BlockInputStreams & inputs_, const SortDescription & description_, size_t max_block_size_) + : MergingSortedBlockInputStream(inputs_, description_, max_block_size_) +{ + /// Fill in the column numbers that need to be aggregated. + for (size_t i = 0; i < num_columns; ++i) + { + ColumnWithTypeAndName & column = header.safeGetByPosition(i); + + /// We leave only states of aggregate functions. + if (!startsWith(column.type->getName(), "AggregateFunction")) + { + column_numbers_not_to_aggregate.push_back(i); + continue; + } + + /// Included into PK? + SortDescription::const_iterator it = description.begin(); + for (; it != description.end(); ++it) + if (it->column_name == column.name || (it->column_name.empty() && it->column_number == i)) + break; + + if (it != description.end()) + { + column_numbers_not_to_aggregate.push_back(i); + continue; + } + + column_numbers_to_aggregate.push_back(i); + } +} + + Block AggregatingSortedBlockInputStream::readImpl() { if (finished) return Block(); - Block header; MutableColumns merged_columns; - - init(header, merged_columns); + init(merged_columns); if (has_collation) throw Exception("Logical error: " + getName() + " does not support collations", ErrorCodes::LOGICAL_ERROR); @@ -28,37 +59,6 @@ Block AggregatingSortedBlockInputStream::readImpl() if (merged_columns.empty()) return Block(); - /// Additional initialization. - if (next_key.empty()) - { - /// Fill in the column numbers that need to be aggregated. - for (size_t i = 0; i < num_columns; ++i) - { - ColumnWithTypeAndName & column = header.safeGetByPosition(i); - - /// We leave only states of aggregate functions. - if (!startsWith(column.type->getName(), "AggregateFunction")) - { - column_numbers_not_to_aggregate.push_back(i); - continue; - } - - /// Included into PK? - SortDescription::const_iterator it = description.begin(); - for (; it != description.end(); ++it) - if (it->column_name == column.name || (it->column_name.empty() && it->column_number == i)) - break; - - if (it != description.end()) - { - column_numbers_not_to_aggregate.push_back(i); - continue; - } - - column_numbers_to_aggregate.push_back(i); - } - } - columns_to_aggregate.resize(column_numbers_to_aggregate.size()); for (size_t i = 0, size = columns_to_aggregate.size(); i < size; ++i) columns_to_aggregate[i] = typeid_cast(merged_columns[column_numbers_to_aggregate[i]].get()); diff --git a/dbms/src/DataStreams/AggregatingSortedBlockInputStream.h b/dbms/src/DataStreams/AggregatingSortedBlockInputStream.h index e428b3b7e20..5047158aa2d 100644 --- a/dbms/src/DataStreams/AggregatingSortedBlockInputStream.h +++ b/dbms/src/DataStreams/AggregatingSortedBlockInputStream.h @@ -21,10 +21,8 @@ namespace DB class AggregatingSortedBlockInputStream : public MergingSortedBlockInputStream { public: - AggregatingSortedBlockInputStream(BlockInputStreams inputs_, const SortDescription & description_, size_t max_block_size_) - : MergingSortedBlockInputStream(inputs_, description_, max_block_size_) - { - } + AggregatingSortedBlockInputStream( + const BlockInputStreams & inputs_, const SortDescription & description_, size_t max_block_size_); String getName() const override { return "AggregatingSorted"; } diff --git a/dbms/src/DataStreams/CollapsingSortedBlockInputStream.cpp b/dbms/src/DataStreams/CollapsingSortedBlockInputStream.cpp index 9b70bd6b89a..01127b5029b 100644 --- a/dbms/src/DataStreams/CollapsingSortedBlockInputStream.cpp +++ b/dbms/src/DataStreams/CollapsingSortedBlockInputStream.cpp @@ -108,10 +108,8 @@ Block CollapsingSortedBlockInputStream::readImpl() if (finished) return {}; - Block header; MutableColumns merged_columns; - - init(header, merged_columns); + init(merged_columns); if (has_collation) throw Exception("Logical error: " + getName() + " does not support collations", ErrorCodes::LOGICAL_ERROR); @@ -119,11 +117,6 @@ Block CollapsingSortedBlockInputStream::readImpl() if (merged_columns.empty()) return {}; - /// Additional initialization. - if (first_negative.empty()) - sign_column_number = header.getPositionByName(sign_column); - - merge(merged_columns, queue); return header.cloneWithColumns(std::move(merged_columns)); } diff --git a/dbms/src/DataStreams/CollapsingSortedBlockInputStream.h b/dbms/src/DataStreams/CollapsingSortedBlockInputStream.h index 7280dda02b1..e8650b4efc5 100644 --- a/dbms/src/DataStreams/CollapsingSortedBlockInputStream.h +++ b/dbms/src/DataStreams/CollapsingSortedBlockInputStream.h @@ -25,10 +25,10 @@ class CollapsingSortedBlockInputStream : public MergingSortedBlockInputStream public: CollapsingSortedBlockInputStream( BlockInputStreams inputs_, const SortDescription & description_, - const String & sign_column_, size_t max_block_size_, WriteBuffer * out_row_sources_buf_ = nullptr) + const String & sign_column, size_t max_block_size_, WriteBuffer * out_row_sources_buf_ = nullptr) : MergingSortedBlockInputStream(inputs_, description_, max_block_size_, 0, out_row_sources_buf_) - , sign_column(sign_column_) { + sign_column_number = header.getPositionByName(sign_column); } String getName() const override { return "CollapsingSorted"; } @@ -38,8 +38,7 @@ protected: Block readImpl() override; private: - String sign_column; - size_t sign_column_number = 0; + size_t sign_column_number; Logger * log = &Logger::get("CollapsingSortedBlockInputStream"); diff --git a/dbms/src/DataStreams/GraphiteRollupSortedBlockInputStream.cpp b/dbms/src/DataStreams/GraphiteRollupSortedBlockInputStream.cpp index 5da53d8eea5..0a2273d45a9 100644 --- a/dbms/src/DataStreams/GraphiteRollupSortedBlockInputStream.cpp +++ b/dbms/src/DataStreams/GraphiteRollupSortedBlockInputStream.cpp @@ -12,6 +12,31 @@ namespace ErrorCodes } +GraphiteRollupSortedBlockInputStream::GraphiteRollupSortedBlockInputStream( + const BlockInputStreams & inputs_, const SortDescription & description_, size_t max_block_size_, + const Graphite::Params & params, time_t time_of_merge) + : MergingSortedBlockInputStream(inputs_, description_, max_block_size_), + params(params), time_of_merge(time_of_merge) +{ + size_t max_size_of_aggregate_state = 0; + for (const auto & pattern : params.patterns) + if (pattern.function->sizeOfData() > max_size_of_aggregate_state) + max_size_of_aggregate_state = pattern.function->sizeOfData(); + + place_for_aggregate_state.resize(max_size_of_aggregate_state); + + /// Memoize column numbers in block. + path_column_num = header.getPositionByName(params.path_column_name); + time_column_num = header.getPositionByName(params.time_column_name); + value_column_num = header.getPositionByName(params.value_column_name); + version_column_num = header.getPositionByName(params.version_column_name); + + for (size_t i = 0; i < num_columns; ++i) + if (i != time_column_num && i != value_column_num && i != version_column_num) + unmodified_column_numbers.push_back(i); +} + + const Graphite::Pattern * GraphiteRollupSortedBlockInputStream::selectPatternForPath(StringRef path) const { for (const auto & pattern : params.patterns) @@ -68,10 +93,8 @@ Block GraphiteRollupSortedBlockInputStream::readImpl() if (finished) return Block(); - Block header; MutableColumns merged_columns; - - init(header, merged_columns); + init(merged_columns); if (has_collation) throw Exception("Logical error: " + getName() + " does not support collations", ErrorCodes::LOGICAL_ERROR); @@ -79,27 +102,6 @@ Block GraphiteRollupSortedBlockInputStream::readImpl() if (merged_columns.empty()) return Block(); - /// Additional initialization. - if (is_first) - { - size_t max_size_of_aggregate_state = 0; - for (const auto & pattern : params.patterns) - if (pattern.function->sizeOfData() > max_size_of_aggregate_state) - max_size_of_aggregate_state = pattern.function->sizeOfData(); - - place_for_aggregate_state.resize(max_size_of_aggregate_state); - - /// Memoize column numbers in block. - path_column_num = header.getPositionByName(params.path_column_name); - time_column_num = header.getPositionByName(params.time_column_name); - value_column_num = header.getPositionByName(params.value_column_name); - version_column_num = header.getPositionByName(params.version_column_name); - - for (size_t i = 0; i < num_columns; ++i) - if (i != time_column_num && i != value_column_num && i != version_column_num) - unmodified_column_numbers.push_back(i); - } - merge(merged_columns, queue); return header.cloneWithColumns(std::move(merged_columns)); } diff --git a/dbms/src/DataStreams/GraphiteRollupSortedBlockInputStream.h b/dbms/src/DataStreams/GraphiteRollupSortedBlockInputStream.h index c256d27064d..15dfe7c0f4d 100644 --- a/dbms/src/DataStreams/GraphiteRollupSortedBlockInputStream.h +++ b/dbms/src/DataStreams/GraphiteRollupSortedBlockInputStream.h @@ -126,12 +126,8 @@ class GraphiteRollupSortedBlockInputStream : public MergingSortedBlockInputStrea { public: GraphiteRollupSortedBlockInputStream( - BlockInputStreams inputs_, const SortDescription & description_, size_t max_block_size_, - const Graphite::Params & params, time_t time_of_merge) - : MergingSortedBlockInputStream(inputs_, description_, max_block_size_), - params(params), time_of_merge(time_of_merge) - { - } + const BlockInputStreams & inputs_, const SortDescription & description_, size_t max_block_size_, + const Graphite::Params & params, time_t time_of_merge); String getName() const override { return "GraphiteRollupSorted"; } diff --git a/dbms/src/DataStreams/MergeSortingBlockInputStream.cpp b/dbms/src/DataStreams/MergeSortingBlockInputStream.cpp index 628de41b32e..abfcdc89698 100644 --- a/dbms/src/DataStreams/MergeSortingBlockInputStream.cpp +++ b/dbms/src/DataStreams/MergeSortingBlockInputStream.cpp @@ -63,14 +63,21 @@ static void enrichBlockWithConstants(Block & block, const Block & header) } +MergeSortingBlockInputStream::MergeSortingBlockInputStream( + const BlockInputStreamPtr & input, SortDescription & description_, + size_t max_merged_block_size_, size_t limit_, + size_t max_bytes_before_external_sort_, const std::string & tmp_path_) + : description(description_), max_merged_block_size(max_merged_block_size_), limit(limit_), + max_bytes_before_external_sort(max_bytes_before_external_sort_), tmp_path(tmp_path_) +{ + children.push_back(input); + header = getHeader(); + removeConstantsFromSortDescription(header, description); +} + + Block MergeSortingBlockInputStream::readImpl() { - if (!header) - { - header = getHeader(); - removeConstantsFromSortDescription(header, description); - } - /** Algorithm: * - read to memory blocks from source stream; * - if too many of them and if external sorting is enabled, diff --git a/dbms/src/DataStreams/MergeSortingBlockInputStream.h b/dbms/src/DataStreams/MergeSortingBlockInputStream.h index 416dc0ecce7..498837f3bff 100644 --- a/dbms/src/DataStreams/MergeSortingBlockInputStream.h +++ b/dbms/src/DataStreams/MergeSortingBlockInputStream.h @@ -73,12 +73,7 @@ public: /// limit - if not 0, allowed to return just first 'limit' rows in sorted order. MergeSortingBlockInputStream(const BlockInputStreamPtr & input, SortDescription & description_, size_t max_merged_block_size_, size_t limit_, - size_t max_bytes_before_external_sort_, const std::string & tmp_path_) - : description(description_), max_merged_block_size(max_merged_block_size_), limit(limit_), - max_bytes_before_external_sort(max_bytes_before_external_sort_), tmp_path(tmp_path_) - { - children.push_back(input); - } + size_t max_bytes_before_external_sort_, const std::string & tmp_path_); String getName() const override { return "MergeSorting"; } diff --git a/dbms/src/DataStreams/MergingSortedBlockInputStream.cpp b/dbms/src/DataStreams/MergingSortedBlockInputStream.cpp index be90a00e4b9..62b32330679 100644 --- a/dbms/src/DataStreams/MergingSortedBlockInputStream.cpp +++ b/dbms/src/DataStreams/MergingSortedBlockInputStream.cpp @@ -15,15 +15,17 @@ namespace ErrorCodes MergingSortedBlockInputStream::MergingSortedBlockInputStream( - BlockInputStreams & inputs_, const SortDescription & description_, - size_t max_block_size_, size_t limit_, WriteBuffer * out_row_sources_buf_, bool quiet_) + const BlockInputStreams & inputs_, const SortDescription & description_, + size_t max_block_size_, size_t limit_, WriteBuffer * out_row_sources_buf_, bool quiet_) : description(description_), max_block_size(max_block_size_), limit(limit_), quiet(quiet_) , source_blocks(inputs_.size()), cursors(inputs_.size()), out_row_sources_buf(out_row_sources_buf_) { children.insert(children.end(), inputs_.begin(), inputs_.end()); + header = children.at(0)->getHeader(); + num_columns = header.columns(); } -void MergingSortedBlockInputStream::init(Block & header, MutableColumns & merged_columns) +void MergingSortedBlockInputStream::init(MutableColumns & merged_columns) { /// Read the first blocks, initialize the queue. if (first) @@ -44,9 +46,6 @@ void MergingSortedBlockInputStream::init(Block & header, MutableColumns & merged if (rows == 0) continue; - if (!num_columns) - num_columns = shared_block_ptr->columns(); - if (expected_block_size < rows) expected_block_size = std::min(rows, max_block_size); @@ -62,32 +61,9 @@ void MergingSortedBlockInputStream::init(Block & header, MutableColumns & merged initQueue(queue); } - /// Initialize the result. - - /// We clone the structure of the first non-empty source block. - { - auto it = source_blocks.cbegin(); - for (; it != source_blocks.cend(); ++it) - { - const SharedBlockPtr & shared_block_ptr = *it; - - if (*shared_block_ptr) - { - header = shared_block_ptr->cloneEmpty(); - break; - } - } - - /// If all the input blocks are empty. - if (it == source_blocks.cend()) - return; - } - /// Let's check that all source blocks have the same structure. - for (auto it = source_blocks.cbegin(); it != source_blocks.cend(); ++it) + for (const SharedBlockPtr & shared_block_ptr : source_blocks) { - const SharedBlockPtr & shared_block_ptr = *it; - if (!*shared_block_ptr) continue; @@ -120,10 +96,9 @@ Block MergingSortedBlockInputStream::readImpl() if (children.size() == 1) return children[0]->read(); - Block header; MutableColumns merged_columns; - init(header, merged_columns); + init(merged_columns); if (merged_columns.empty()) return {}; diff --git a/dbms/src/DataStreams/MergingSortedBlockInputStream.h b/dbms/src/DataStreams/MergingSortedBlockInputStream.h index 6391f52dcd5..825a9e1fcc3 100644 --- a/dbms/src/DataStreams/MergingSortedBlockInputStream.h +++ b/dbms/src/DataStreams/MergingSortedBlockInputStream.h @@ -65,8 +65,8 @@ public: * quiet - don't log profiling info */ MergingSortedBlockInputStream( - BlockInputStreams & inputs_, const SortDescription & description_, size_t max_block_size_, - size_t limit_ = 0, WriteBuffer * out_row_sources_buf_ = nullptr, bool quiet_ = false); + const BlockInputStreams & inputs_, const SortDescription & description_, size_t max_block_size_, + size_t limit_ = 0, WriteBuffer * out_row_sources_buf_ = nullptr, bool quiet_ = false); String getName() const override { return "MergingSorted"; } @@ -74,7 +74,7 @@ public: bool isSortedOutput() const override { return true; } const SortDescription & getSortDescription() const override { return description; } - Block getHeader() const override { return children.at(0)->getHeader(); } + Block getHeader() const override { return header; } protected: struct RowRef @@ -120,14 +120,16 @@ protected: void readSuffixImpl() override; - /// Initializes the queue and the next result block. - void init(Block & header, MutableColumns & merged_columns); + /// Initializes the queue and the columns of next result block. + void init(MutableColumns & merged_columns); /// Gets the next block from the source corresponding to the `current`. template void fetchNextBlock(const TSortCursor & current, std::priority_queue & queue); + Block header; + const SortDescription description; const size_t max_block_size; size_t limit; diff --git a/dbms/src/DataStreams/ReplacingSortedBlockInputStream.cpp b/dbms/src/DataStreams/ReplacingSortedBlockInputStream.cpp index 553e3a01e4b..8fcfdfe2d58 100644 --- a/dbms/src/DataStreams/ReplacingSortedBlockInputStream.cpp +++ b/dbms/src/DataStreams/ReplacingSortedBlockInputStream.cpp @@ -35,10 +35,8 @@ Block ReplacingSortedBlockInputStream::readImpl() if (finished) return Block(); - Block header; MutableColumns merged_columns; - - init(header, merged_columns); + init(merged_columns); if (has_collation) throw Exception("Logical error: " + getName() + " does not support collations", ErrorCodes::LOGICAL_ERROR); @@ -46,13 +44,6 @@ Block ReplacingSortedBlockInputStream::readImpl() if (merged_columns.empty()) return Block(); - /// Additional initialization. - if (selected_row.empty()) - { - if (!version_column.empty()) - version_column_number = header.getPositionByName(version_column); - } - merge(merged_columns, queue); return header.cloneWithColumns(std::move(merged_columns)); } diff --git a/dbms/src/DataStreams/ReplacingSortedBlockInputStream.h b/dbms/src/DataStreams/ReplacingSortedBlockInputStream.h index b8592a0e5b6..d0a7594c69a 100644 --- a/dbms/src/DataStreams/ReplacingSortedBlockInputStream.h +++ b/dbms/src/DataStreams/ReplacingSortedBlockInputStream.h @@ -15,11 +15,13 @@ namespace DB class ReplacingSortedBlockInputStream : public MergingSortedBlockInputStream { public: - ReplacingSortedBlockInputStream(BlockInputStreams inputs_, const SortDescription & description_, - const String & version_column_, size_t max_block_size_, WriteBuffer * out_row_sources_buf_ = nullptr) - : MergingSortedBlockInputStream(inputs_, description_, max_block_size_, 0, out_row_sources_buf_), - version_column(version_column_) + ReplacingSortedBlockInputStream( + const BlockInputStreams & inputs_, const SortDescription & description_, + const String & version_column, size_t max_block_size_, WriteBuffer * out_row_sources_buf_ = nullptr) + : MergingSortedBlockInputStream(inputs_, description_, max_block_size_, 0, out_row_sources_buf_) { + if (!version_column.empty()) + version_column_number = header.getPositionByName(version_column); } String getName() const override { return "ReplacingSorted"; } @@ -29,7 +31,6 @@ protected: Block readImpl() override; private: - String version_column; ssize_t version_column_number = -1; Logger * log = &Logger::get("ReplacingSortedBlockInputStream"); diff --git a/dbms/src/DataStreams/SummingSortedBlockInputStream.cpp b/dbms/src/DataStreams/SummingSortedBlockInputStream.cpp index e79366ca02d..e914b8f8b65 100644 --- a/dbms/src/DataStreams/SummingSortedBlockInputStream.cpp +++ b/dbms/src/DataStreams/SummingSortedBlockInputStream.cpp @@ -24,6 +24,168 @@ namespace ErrorCodes } +namespace +{ + bool isInPrimaryKey(const SortDescription & description, const std::string & name, const size_t number) + { + for (auto & desc : description) + if (desc.column_name == name || (desc.column_name.empty() && desc.column_number == number)) + return true; + + return false; + } +} + + +SummingSortedBlockInputStream::SummingSortedBlockInputStream( + const BlockInputStreams & inputs_, + const SortDescription & description_, + /// List of columns to be summed. If empty, all numeric columns that are not in the description are taken. + const Names & column_names_to_sum, + size_t max_block_size_) + : MergingSortedBlockInputStream(inputs_, description_, max_block_size_) +{ + current_row.resize(num_columns); + + /// name of nested structure -> the column numbers that refer to it. + std::unordered_map> discovered_maps; + + /** Fill in the column numbers, which must be summed. + * This can only be numeric columns that are not part of the sort key. + * If a non-empty column_names_to_sum is specified, then we only take these columns. + * Some columns from column_names_to_sum may not be found. This is ignored. + */ + for (size_t i = 0; i < num_columns; ++i) + { + const ColumnWithTypeAndName & column = header.safeGetByPosition(i); + + /// Discover nested Maps and find columns for summation + if (typeid_cast(column.type.get())) + { + const auto map_name = Nested::extractTableName(column.name); + /// if nested table name ends with `Map` it is a possible candidate for special handling + if (map_name == column.name || !endsWith(map_name, "Map")) + { + column_numbers_not_to_aggregate.push_back(i); + continue; + } + + discovered_maps[map_name].emplace_back(i); + } + else + { + if (!column.type->isSummable()) + { + column_numbers_not_to_aggregate.push_back(i); + continue; + } + + /// Are they inside the PK? + if (isInPrimaryKey(description, column.name, i)) + { + column_numbers_not_to_aggregate.push_back(i); + continue; + } + + if (column_names_to_sum.empty() + || column_names_to_sum.end() != + std::find(column_names_to_sum.begin(), column_names_to_sum.end(), column.name)) + { + // Create aggregator to sum this column + AggregateDescription desc; + desc.column_numbers = {i}; + desc.init("sumWithOverflow", {column.type}); + columns_to_aggregate.emplace_back(std::move(desc)); + } + else + { + // Column is not going to be summed, use last value + column_numbers_not_to_aggregate.push_back(i); + } + } + } + + /// select actual nested Maps from list of candidates + for (const auto & map : discovered_maps) + { + /// map should contain at least two elements (key -> value) + if (map.second.size() < 2) + { + for (auto col : map.second) + column_numbers_not_to_aggregate.push_back(col); + continue; + } + + /// no elements of map could be in primary key + auto column_num_it = map.second.begin(); + for (; column_num_it != map.second.end(); ++column_num_it) + if (isInPrimaryKey(description, header.safeGetByPosition(*column_num_it).name, *column_num_it)) + break; + if (column_num_it != map.second.end()) + { + for (auto col : map.second) + column_numbers_not_to_aggregate.push_back(col); + continue; + } + + DataTypes argument_types; + AggregateDescription desc; + MapDescription map_desc; + + column_num_it = map.second.begin(); + for (; column_num_it != map.second.end(); ++column_num_it) + { + const ColumnWithTypeAndName & key_col = header.safeGetByPosition(*column_num_it); + const String & name = key_col.name; + const IDataType & nested_type = *static_cast(key_col.type.get())->getNestedType(); + + if (column_num_it == map.second.begin() + || endsWith(name, "ID") + || endsWith(name, "Key") + || endsWith(name, "Type")) + { + if (!nested_type.isValueRepresentedByInteger()) + break; + + map_desc.key_col_nums.push_back(*column_num_it); + } + else + { + if (!nested_type.isSummable()) + break; + + map_desc.val_col_nums.push_back(*column_num_it); + } + + // Add column to function arguments + desc.column_numbers.push_back(*column_num_it); + argument_types.push_back(key_col.type); + } + + if (column_num_it != map.second.end()) + { + for (auto col : map.second) + column_numbers_not_to_aggregate.push_back(col); + continue; + } + + if (map_desc.key_col_nums.size() == 1) + { + // Create summation for all value columns in the map + desc.init("sumMap", argument_types); + columns_to_aggregate.emplace_back(std::move(desc)); + } + else + { + // Fall back to legacy mergeMaps for composite keys + for (auto col : map.second) + column_numbers_not_to_aggregate.push_back(col); + maps_to_sum.emplace_back(std::move(map_desc)); + } + } +} + + void SummingSortedBlockInputStream::insertCurrentRowIfNeeded(MutableColumns & merged_columns, bool force_insertion) { for (auto & desc : columns_to_aggregate) @@ -78,28 +240,13 @@ void SummingSortedBlockInputStream::insertCurrentRowIfNeeded(MutableColumns & me } -namespace -{ - bool isInPrimaryKey(const SortDescription & description, const std::string & name, const size_t number) - { - for (auto & desc : description) - if (desc.column_name == name || (desc.column_name.empty() && desc.column_number == number)) - return true; - - return false; - } -} - - Block SummingSortedBlockInputStream::readImpl() { if (finished) return Block(); - Block header; MutableColumns merged_columns; - - init(header, merged_columns); + init(merged_columns); if (has_collation) throw Exception("Logical error: " + getName() + " does not support collations", ErrorCodes::LOGICAL_ERROR); @@ -107,150 +254,7 @@ Block SummingSortedBlockInputStream::readImpl() if (merged_columns.empty()) return {}; - /// Additional initialization. - if (current_row.empty()) - { - current_row.resize(num_columns); - - /// name of nested structure -> the column numbers that refer to it. - std::unordered_map> discovered_maps; - - /** Fill in the column numbers, which must be summed. - * This can only be numeric columns that are not part of the sort key. - * If a non-empty column_names_to_sum is specified, then we only take these columns. - * Some columns from column_names_to_sum may not be found. This is ignored. - */ - for (size_t i = 0; i < num_columns; ++i) - { - const ColumnWithTypeAndName & column = header.safeGetByPosition(i); - - /// Discover nested Maps and find columns for summation - if (typeid_cast(column.type.get())) - { - const auto map_name = Nested::extractTableName(column.name); - /// if nested table name ends with `Map` it is a possible candidate for special handling - if (map_name == column.name || !endsWith(map_name, "Map")) - { - column_numbers_not_to_aggregate.push_back(i); - continue; - } - - discovered_maps[map_name].emplace_back(i); - } - else - { - if (!column.type->isSummable()) - { - column_numbers_not_to_aggregate.push_back(i); - continue; - } - - /// Are they inside the PK? - if (isInPrimaryKey(description, column.name, i)) - { - column_numbers_not_to_aggregate.push_back(i); - continue; - } - - if (column_names_to_sum.empty() - || column_names_to_sum.end() != - std::find(column_names_to_sum.begin(), column_names_to_sum.end(), column.name)) - { - // Create aggregator to sum this column - AggregateDescription desc; - desc.column_numbers = {i}; - desc.init("sumWithOverflow", {column.type}); - columns_to_aggregate.emplace_back(std::move(desc)); - } - else - { - // Column is not going to be summed, use last value - column_numbers_not_to_aggregate.push_back(i); - } - } - } - - /// select actual nested Maps from list of candidates - for (const auto & map : discovered_maps) - { - /// map should contain at least two elements (key -> value) - if (map.second.size() < 2) - { - for (auto col : map.second) - column_numbers_not_to_aggregate.push_back(col); - continue; - } - - /// no elements of map could be in primary key - auto column_num_it = map.second.begin(); - for (; column_num_it != map.second.end(); ++column_num_it) - if (isInPrimaryKey(description, header.safeGetByPosition(*column_num_it).name, *column_num_it)) - break; - if (column_num_it != map.second.end()) - { - for (auto col : map.second) - column_numbers_not_to_aggregate.push_back(col); - continue; - } - - DataTypes argument_types = {}; - AggregateDescription desc; - MapDescription map_desc; - - column_num_it = map.second.begin(); - for (; column_num_it != map.second.end(); ++column_num_it) - { - const ColumnWithTypeAndName & key_col = header.safeGetByPosition(*column_num_it); - const String & name = key_col.name; - const IDataType & nested_type = *static_cast(key_col.type.get())->getNestedType(); - - if (column_num_it == map.second.begin() - || endsWith(name, "ID") - || endsWith(name, "Key") - || endsWith(name, "Type")) - { - if (!nested_type.isValueRepresentedByInteger()) - break; - - map_desc.key_col_nums.push_back(*column_num_it); - } - else - { - if (!nested_type.isSummable()) - break; - - map_desc.val_col_nums.push_back(*column_num_it); - } - - // Add column to function arguments - desc.column_numbers.push_back(*column_num_it); - argument_types.push_back(key_col.type); - } - - if (column_num_it != map.second.end()) - { - for (auto col : map.second) - column_numbers_not_to_aggregate.push_back(col); - continue; - } - - if (map_desc.key_col_nums.size() == 1) - { - // Create summation for all value columns in the map - desc.init("sumMap", argument_types); - columns_to_aggregate.emplace_back(std::move(desc)); - } - else - { - // Fall back to legacy mergeMaps for composite keys - for (auto col : map.second) - column_numbers_not_to_aggregate.push_back(col); - maps_to_sum.emplace_back(std::move(map_desc)); - } - } - } - - // Update aggregation result columns for current block + /// Update aggregation result columns for current block for (auto & desc : columns_to_aggregate) { // Wrap aggregated columns in a tuple to match function signature diff --git a/dbms/src/DataStreams/SummingSortedBlockInputStream.h b/dbms/src/DataStreams/SummingSortedBlockInputStream.h index 62df3863fc6..78b61903d01 100644 --- a/dbms/src/DataStreams/SummingSortedBlockInputStream.h +++ b/dbms/src/DataStreams/SummingSortedBlockInputStream.h @@ -24,14 +24,12 @@ namespace ErrorCodes class SummingSortedBlockInputStream : public MergingSortedBlockInputStream { public: - SummingSortedBlockInputStream(BlockInputStreams inputs_, + SummingSortedBlockInputStream( + const BlockInputStreams & inputs_, const SortDescription & description_, /// List of columns to be summed. If empty, all numeric columns that are not in the description are taken. const Names & column_names_to_sum_, - size_t max_block_size_) - : MergingSortedBlockInputStream(inputs_, description_, max_block_size_), column_names_to_sum(column_names_to_sum_) - { - } + size_t max_block_size_); String getName() const override { return "SummingSorted"; } @@ -46,7 +44,6 @@ private: bool finished = false; /// Columns with which values should be summed. - Names column_names_to_sum; /// If set, it is converted to column_numbers_to_aggregate when initialized. ColumnNumbers column_numbers_not_to_aggregate; /** A table can have nested tables that are treated in a special way. diff --git a/dbms/src/DataStreams/VersionedCollapsingSortedBlockInputStream.cpp b/dbms/src/DataStreams/VersionedCollapsingSortedBlockInputStream.cpp index 45c529470c0..071752137c6 100644 --- a/dbms/src/DataStreams/VersionedCollapsingSortedBlockInputStream.cpp +++ b/dbms/src/DataStreams/VersionedCollapsingSortedBlockInputStream.cpp @@ -2,6 +2,7 @@ #include #include + namespace DB { @@ -11,6 +12,20 @@ namespace ErrorCodes extern const int NOT_IMPLEMENTED; } + +VersionedCollapsingSortedBlockInputStream::VersionedCollapsingSortedBlockInputStream( + const BlockInputStreams & inputs_, const SortDescription & description_, + const String & sign_column_, size_t max_block_size_, bool can_collapse_all_rows_, + WriteBuffer * out_row_sources_buf_) + : MergingSortedBlockInputStream(inputs_, description_, max_block_size_, 0, out_row_sources_buf_) + , max_rows_in_queue(std::min(std::max(3, max_block_size_), MAX_ROWS_IN_MULTIVERSION_QUEUE) - 2) + , current_keys(max_rows_in_queue + 1), can_collapse_all_rows(can_collapse_all_rows_) +{ + sign_column_number = header.getPositionByName(sign_column_); +} + + + inline ALWAYS_INLINE static void writeRowSourcePart(WriteBuffer & buffer, RowSourcePart row_source) { if constexpr (sizeof(RowSourcePart) == 1) @@ -52,12 +67,8 @@ Block VersionedCollapsingSortedBlockInputStream::readImpl() if (finished) return {}; - Block header; MutableColumns merged_columns; - - bool is_initialized = !first; - - init(header, merged_columns); + init(merged_columns); if (has_collation) throw Exception("Logical error: " + getName() + " does not support collations", ErrorCodes::NOT_IMPLEMENTED); @@ -65,11 +76,6 @@ Block VersionedCollapsingSortedBlockInputStream::readImpl() if (merged_columns.empty()) return {}; - /// Additional initialization. - if (!is_initialized) - sign_column_number = header.getPositionByName(sign_column); - - merge(merged_columns, queue); return header.cloneWithColumns(std::move(merged_columns)); } diff --git a/dbms/src/DataStreams/VersionedCollapsingSortedBlockInputStream.h b/dbms/src/DataStreams/VersionedCollapsingSortedBlockInputStream.h index 1c299e78e81..636ee5e3833 100644 --- a/dbms/src/DataStreams/VersionedCollapsingSortedBlockInputStream.h +++ b/dbms/src/DataStreams/VersionedCollapsingSortedBlockInputStream.h @@ -6,6 +6,7 @@ #include + namespace DB { @@ -16,6 +17,7 @@ namespace ErrorCodes static const size_t MAX_ROWS_IN_MULTIVERSION_QUEUE = 8192; + /* Deque with fixed memory size. Allows pushing gaps. * frontGap() returns the number of gaps were inserted before front. * @@ -173,15 +175,9 @@ public: /// Don't need version column. It's in primary key. /// max_rows_in_queue should be about max_block_size_ if we won't store a lot of extra blocks (RowRef holds SharedBlockPtr). VersionedCollapsingSortedBlockInputStream( - BlockInputStreams inputs_, const SortDescription & description_, - const String & sign_column_, size_t max_block_size_, bool can_collapse_all_rows_, - WriteBuffer * out_row_sources_buf_ = nullptr) - : MergingSortedBlockInputStream(inputs_, description_, max_block_size_, 0, out_row_sources_buf_) - , sign_column(sign_column_) - , max_rows_in_queue(std::min(std::max(3, max_block_size_), MAX_ROWS_IN_MULTIVERSION_QUEUE) - 2) - , current_keys(max_rows_in_queue + 1), can_collapse_all_rows(can_collapse_all_rows_) - { - } + const BlockInputStreams & inputs_, const SortDescription & description_, + const String & sign_column_, size_t max_block_size_, bool can_collapse_all_rows_, + WriteBuffer * out_row_sources_buf_ = nullptr); String getName() const override { return "VersionedCollapsingSorted"; } @@ -190,8 +186,6 @@ protected: Block readImpl() override; private: - String sign_column; - size_t sign_column_number = 0; Logger * log = &Logger::get("VersionedCollapsingSortedBlockInputStream"); From 3f264f07e9b57ecb295d31005a6ea364085c0502 Mon Sep 17 00:00:00 2001 From: Alexey Milovidov Date: Sat, 7 Apr 2018 05:29:12 +0300 Subject: [PATCH 15/21] Removed excessive library dependencies #2180 --- dbms/CMakeLists.txt | 2 -- dbms/src/Functions/CMakeLists.txt | 4 +++- dbms/src/Server/CMakeLists.txt | 4 ++-- dbms/src/TableFunctions/CMakeLists.txt | 2 +- 4 files changed, 6 insertions(+), 6 deletions(-) diff --git a/dbms/CMakeLists.txt b/dbms/CMakeLists.txt index 5e0bc6257fb..906897fd0f4 100644 --- a/dbms/CMakeLists.txt +++ b/dbms/CMakeLists.txt @@ -145,8 +145,6 @@ target_link_libraries (dbms clickhouse_common_config clickhouse_common_io ${MYSQLXX_LIBRARY} - ${FARMHASH_LIBRARIES} - ${METROHASH_LIBRARIES} ${RE2_LIBRARY} ${RE2_ST_LIBRARY} ${OPENSSL_CRYPTO_LIBRARY} diff --git a/dbms/src/Functions/CMakeLists.txt b/dbms/src/Functions/CMakeLists.txt index cf0bf00b075..cbc5288eac5 100644 --- a/dbms/src/Functions/CMakeLists.txt +++ b/dbms/src/Functions/CMakeLists.txt @@ -78,7 +78,9 @@ list(REMOVE_ITEM clickhouse_functions_sources IFunction.cpp FunctionFactory.cpp list(REMOVE_ITEM clickhouse_functions_headers IFunction.h FunctionFactory.h FunctionHelpers.h) add_library(clickhouse_functions ${clickhouse_functions_sources}) -target_link_libraries(clickhouse_functions PUBLIC dbms PRIVATE libconsistent-hashing) + +target_link_libraries(clickhouse_functions PUBLIC dbms PRIVATE libconsistent-hashing ${FARMHASH_LIBRARIES} ${METROHASH_LIBRARIES}) + target_include_directories (clickhouse_functions BEFORE PUBLIC ${ClickHouse_SOURCE_DIR}/contrib/libfarmhash) target_include_directories (clickhouse_functions BEFORE PUBLIC ${ClickHouse_SOURCE_DIR}/contrib/libmetrohash/src) target_include_directories (clickhouse_functions BEFORE PUBLIC ${DIVIDE_INCLUDE_DIR}) diff --git a/dbms/src/Server/CMakeLists.txt b/dbms/src/Server/CMakeLists.txt index a153d3c932a..b2d72fb3c8b 100644 --- a/dbms/src/Server/CMakeLists.txt +++ b/dbms/src/Server/CMakeLists.txt @@ -37,7 +37,7 @@ add_library (clickhouse-extract-from-config-lib ${SPLIT_SHARED} ExtractFromConfi target_link_libraries (clickhouse-extract-from-config-lib clickhouse_common_config clickhouse_common_io ${Boost_PROGRAM_OPTIONS_LIBRARY}) add_library (clickhouse-client-lib Client.cpp) -target_link_libraries (clickhouse-client-lib clickhouse_functions clickhouse_aggregate_functions clickhouse_table_functions ${LINE_EDITING_LIBS} ${Boost_PROGRAM_OPTIONS_LIBRARY}) +target_link_libraries (clickhouse-client-lib clickhouse_functions clickhouse_aggregate_functions ${LINE_EDITING_LIBS} ${Boost_PROGRAM_OPTIONS_LIBRARY}) target_include_directories (clickhouse-client-lib PRIVATE ${READLINE_INCLUDE_DIR}) install (FILES clickhouse-client.xml DESTINATION ${CLICKHOUSE_ETC_DIR}/clickhouse-client COMPONENT clickhouse-client RENAME config.xml) @@ -56,7 +56,7 @@ add_library (clickhouse-format-lib ${SPLIT_SHARED} Format.cpp) target_link_libraries (clickhouse-format-lib dbms clickhouse_common_io ${Boost_PROGRAM_OPTIONS_LIBRARY}) add_library (clickhouse-copier-lib ClusterCopier.cpp) -target_link_libraries (clickhouse-copier-lib clickhouse-server-lib clickhouse_functions clickhouse_aggregate_functions clickhouse_table_functions) +target_link_libraries (clickhouse-copier-lib clickhouse-server-lib clickhouse_functions clickhouse_aggregate_functions) if (USE_EMBEDDED_COMPILER) link_directories (${LLVM_LIBRARY_DIRS}) diff --git a/dbms/src/TableFunctions/CMakeLists.txt b/dbms/src/TableFunctions/CMakeLists.txt index 4708ed9b602..53bfccfa3a2 100644 --- a/dbms/src/TableFunctions/CMakeLists.txt +++ b/dbms/src/TableFunctions/CMakeLists.txt @@ -5,7 +5,7 @@ list(REMOVE_ITEM clickhouse_table_functions_sources ITableFunction.cpp TableFunc list(REMOVE_ITEM clickhouse_table_functions_headers ITableFunction.h TableFunctionFactory.h) add_library(clickhouse_table_functions ${clickhouse_table_functions_sources}) -target_link_libraries(clickhouse_table_functions dbms clickhouse_storages_system ${Poco_Foundation_LIBRARY}) +target_link_libraries(clickhouse_table_functions dbms ${Poco_Foundation_LIBRARY}) if (Poco_SQLODBC_FOUND) target_link_libraries (clickhouse_table_functions ${Poco_SQLODBC_LIBRARY}) From f40817f851d621d15c65496461f738fd4196d26d Mon Sep 17 00:00:00 2001 From: Alexey Milovidov Date: Sat, 7 Apr 2018 06:07:26 +0300 Subject: [PATCH 16/21] Moved SessionPoolHelpers #2180 --- .../libpocoext/include/Poco/Ext/SessionPoolHelpers.h | 0 .../libpocoext/src/SessionPoolHelpers.cpp | 2 +- 2 files changed, 1 insertion(+), 1 deletion(-) rename dbms/src/Common/PocoSessionPoolHelpers.h => libs/libpocoext/include/Poco/Ext/SessionPoolHelpers.h (100%) rename dbms/src/Common/PocoSessionPoolHelpers.cpp => libs/libpocoext/src/SessionPoolHelpers.cpp (92%) diff --git a/dbms/src/Common/PocoSessionPoolHelpers.h b/libs/libpocoext/include/Poco/Ext/SessionPoolHelpers.h similarity index 100% rename from dbms/src/Common/PocoSessionPoolHelpers.h rename to libs/libpocoext/include/Poco/Ext/SessionPoolHelpers.h diff --git a/dbms/src/Common/PocoSessionPoolHelpers.cpp b/libs/libpocoext/src/SessionPoolHelpers.cpp similarity index 92% rename from dbms/src/Common/PocoSessionPoolHelpers.cpp rename to libs/libpocoext/src/SessionPoolHelpers.cpp index f7fd155cbe9..61c1ace6b96 100644 --- a/dbms/src/Common/PocoSessionPoolHelpers.cpp +++ b/libs/libpocoext/src/SessionPoolHelpers.cpp @@ -1,6 +1,6 @@ #include #include -#include +#include std::shared_ptr createAndCheckResizePocoSessionPool(PocoSessionPoolConstructor pool_constr) From 18bbd0e61bc891d23a35382787a3ae019fd4c41f Mon Sep 17 00:00:00 2001 From: Alexey Milovidov Date: Sat, 7 Apr 2018 06:07:32 +0300 Subject: [PATCH 17/21] Moved SessionPoolHelpers #2180 --- dbms/src/Dictionaries/ODBCDictionarySource.cpp | 2 +- dbms/src/Storages/StorageODBC.cpp | 2 +- libs/libpocoext/CMakeLists.txt | 9 ++++++++- 3 files changed, 10 insertions(+), 3 deletions(-) diff --git a/dbms/src/Dictionaries/ODBCDictionarySource.cpp b/dbms/src/Dictionaries/ODBCDictionarySource.cpp index 7db8dbe9e34..489b168b3fa 100644 --- a/dbms/src/Dictionaries/ODBCDictionarySource.cpp +++ b/dbms/src/Dictionaries/ODBCDictionarySource.cpp @@ -1,6 +1,6 @@ #include #include -#include +#include #include #include #include diff --git a/dbms/src/Storages/StorageODBC.cpp b/dbms/src/Storages/StorageODBC.cpp index 09791c0e314..39b51d46047 100644 --- a/dbms/src/Storages/StorageODBC.cpp +++ b/dbms/src/Storages/StorageODBC.cpp @@ -1,4 +1,4 @@ -#include +#include #include #include #include diff --git a/libs/libpocoext/CMakeLists.txt b/libs/libpocoext/CMakeLists.txt index 64745f6e8d2..6b2c09b4bf1 100644 --- a/libs/libpocoext/CMakeLists.txt +++ b/libs/libpocoext/CMakeLists.txt @@ -1,9 +1,16 @@ add_library (pocoext ${SPLIT_SHARED} src/LevelFilterChannel.cpp src/ThreadNumber.cpp + src/SessionPoolHelpers.cpp include/Poco/Ext/LevelFilterChannel.h - include/Poco/Ext/ThreadNumber.h) + include/Poco/Ext/ThreadNumber.h + include/Poco/Ext/SessionPoolHelpers.h) + +if (Poco_Data_FOUND) + target_include_directories (pocoext PRIVATE ${Poco_Data_INCLUDE_DIRS}) + target_link_libraries(pocoext ${Poco_Data_LIBRARY}) +endif() target_include_directories (pocoext PUBLIC include PRIVATE ${COMMON_INCLUDE_DIR}) From babfc6aaf7eff7ca0227f3ddbc212e626d17ca67 Mon Sep 17 00:00:00 2001 From: Alexey Milovidov Date: Sat, 7 Apr 2018 06:46:20 +0300 Subject: [PATCH 18/21] Fixed error [#CLICKHOUSE-2] --- dbms/src/DataStreams/MergeSortingBlockInputStream.cpp | 2 +- dbms/src/DataStreams/MergeSortingBlockInputStream.h | 4 ++-- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/dbms/src/DataStreams/MergeSortingBlockInputStream.cpp b/dbms/src/DataStreams/MergeSortingBlockInputStream.cpp index abfcdc89698..9995dde25f5 100644 --- a/dbms/src/DataStreams/MergeSortingBlockInputStream.cpp +++ b/dbms/src/DataStreams/MergeSortingBlockInputStream.cpp @@ -140,7 +140,7 @@ Block MergeSortingBlockInputStream::readImpl() /// Create sorted streams to merge. for (const auto & file : temporary_files) { - temporary_inputs.emplace_back(std::make_unique(file->path())); + temporary_inputs.emplace_back(std::make_unique(file->path(), header)); inputs_to_merge.emplace_back(temporary_inputs.back()->block_in); } diff --git a/dbms/src/DataStreams/MergeSortingBlockInputStream.h b/dbms/src/DataStreams/MergeSortingBlockInputStream.h index 498837f3bff..feb882effb0 100644 --- a/dbms/src/DataStreams/MergeSortingBlockInputStream.h +++ b/dbms/src/DataStreams/MergeSortingBlockInputStream.h @@ -115,8 +115,8 @@ private: CompressedReadBuffer compressed_in; BlockInputStreamPtr block_in; - TemporaryFileStream(const std::string & path) - : file_in(path), compressed_in(file_in), block_in(std::make_shared(compressed_in, 0)) {} + TemporaryFileStream(const std::string & path, const Block & header) + : file_in(path), compressed_in(file_in), block_in(std::make_shared(compressed_in, header, 0)) {} }; std::vector> temporary_inputs; From ee087a15419eaeae747b82a3a3ea116abecb9644 Mon Sep 17 00:00:00 2001 From: proller Date: Fri, 6 Apr 2018 20:47:54 -0700 Subject: [PATCH 19/21] Fix query compile in docker, update docker image to ubuntu 17.10 artful (#2126) * Squashed commit of the following: commit bedcf840b563aad3edb03b43417338fab0e7cb48 Author: proller Date: Mon Apr 2 20:17:36 2018 +0300 Revert "Prepare to new poco (PocoData renamed to PocoSQL)" This reverts commit ad5e11ad88ac4f1c3a5ad21153042e2498ca0d68. commit b7f1c352f0eb132b133846c7214e70a79f26e611 Merge: ad5e11ad8 fb7e2cbd1 Author: proller Date: Mon Apr 2 20:12:10 2018 +0300 Merge remote-tracking branch 'upstream/master' into fix3 commit ad5e11ad88ac4f1c3a5ad21153042e2498ca0d68 Author: proller Date: Mon Apr 2 20:09:49 2018 +0300 Prepare to new poco (PocoData renamed to PocoSQL) commit fcb90ca39dd32a29e29eb68bf559f381e80f74b4 Merge: 9ded77d62 ad137994f Author: proller Date: Mon Apr 2 13:17:01 2018 +0300 Merge remote-tracking branch 'upstream/master' into fix3 commit 9ded77d62aa668c6b7b55d209d5760bc5b517fbf Merge: 14cea9052 412edac65 Author: proller Date: Fri Mar 30 21:06:20 2018 +0300 Merge remote-tracking branch 'upstream/master' into fix3 commit 14cea90524b8b977bff9b85647e00f0e1c26570b Merge: 9b6d88e67 82932f904 Author: proller Date: Fri Mar 30 14:55:42 2018 +0300 Merge remote-tracking branch 'upstream/master' into fix3 commit 9b6d88e67b114a4c42b624690d691988be39f227 Merge: 0afe7b7d1 b99783028 Author: proller Date: Thu Mar 29 20:35:15 2018 +0300 Merge remote-tracking branch 'upstream/master' into fix3 commit 0afe7b7d1f4792403ba4fb33dfb250ece2edc41d Author: proller Date: Wed Mar 28 16:03:55 2018 +0300 add docker/test commit c46f0b4084610a6d36b6823fb5ee48381866272e Author: proller Date: Wed Mar 28 15:43:53 2018 +0300 fix commit 3435dee49f31fe8f6cd9b01da4a2d5820f03a4a4 Author: proller Date: Wed Mar 28 15:18:54 2018 +0300 Fix query compile in docker, update docker image to ubuntu 17.10 artful * Update Dockerfile --- docker/client/Dockerfile | 5 +++-- docker/server/docker_related_config.xml | 6 ++++++ docker/test/Dockerfile | 19 +++++++++++++++++++ docker/test/README.md | 5 +++++ 4 files changed, 33 insertions(+), 2 deletions(-) create mode 100644 docker/test/Dockerfile create mode 100644 docker/test/README.md diff --git a/docker/client/Dockerfile b/docker/client/Dockerfile index 9c8f3d087a3..59998b4a507 100644 --- a/docker/client/Dockerfile +++ b/docker/client/Dockerfile @@ -1,11 +1,12 @@ -FROM ubuntu:16.04 +FROM ubuntu:17.10 ARG repository="deb http://repo.yandex.ru/clickhouse/deb/stable/ main/" ARG version=\* RUN apt-get update && \ - apt-get install -y apt-transport-https && \ + apt-get install -y apt-transport-https dirmngr && \ mkdir -p /etc/apt/sources.list.d && \ + apt-key adv --keyserver keyserver.ubuntu.com --recv E0C56BD4 && \ echo $repository | tee /etc/apt/sources.list.d/clickhouse.list && \ apt-get update && \ apt-get install --allow-unauthenticated -y clickhouse-client=$version locales && \ diff --git a/docker/server/docker_related_config.xml b/docker/server/docker_related_config.xml index ab6f82ad4c4..e1df3bb3890 100644 --- a/docker/server/docker_related_config.xml +++ b/docker/server/docker_related_config.xml @@ -3,4 +3,10 @@ 0.0.0.0 :: 1 + + diff --git a/docker/test/Dockerfile b/docker/test/Dockerfile new file mode 100644 index 00000000000..5dfbf73d255 --- /dev/null +++ b/docker/test/Dockerfile @@ -0,0 +1,19 @@ +FROM ubuntu:17.10 + +ARG repository="deb http://repo.yandex.ru/clickhouse/deb/stable/ main/" +ARG version=\* + +RUN apt-get update && \ + apt-get install -y apt-transport-https dirmngr && \ + mkdir -p /etc/apt/sources.list.d && \ + apt-key adv --keyserver keyserver.ubuntu.com --recv E0C56BD4 && \ + echo $repository | tee /etc/apt/sources.list.d/clickhouse.list && \ + apt-get update && \ + apt-get install --allow-unauthenticated -y clickhouse-test && \ + rm -rf /var/lib/apt/lists/* /var/cache/debconf && \ + apt-get clean + +# clickhouse-test bug: it doesn't start without server config, remove after release 1.1.54372 : +RUN mkdir -p /etc/clickhouse-server && echo "" > /etc/clickhouse-server/config.xml + +ENTRYPOINT ["/usr/bin/clickhouse-test"] diff --git a/docker/test/README.md b/docker/test/README.md new file mode 100644 index 00000000000..0833aacb822 --- /dev/null +++ b/docker/test/README.md @@ -0,0 +1,5 @@ +# ClickHouse Test Docker Image + +## License + +View [license information](https://github.com/yandex/ClickHouse/blob/master/LICENSE) for the software contained in this image. From 07b81c875e4403053f3d05fa45ec1dac79bd0d28 Mon Sep 17 00:00:00 2001 From: proller Date: Fri, 6 Apr 2018 20:49:49 -0700 Subject: [PATCH 20/21] CLICKHOUSE-3444: show error for old query on replace_running_query (#2127) * Add tests * Fix test * Fix test build * CLICKHOUSE-3444: show error for replace_running_query * fix naming * fix * fix test * Update IProfilingBlockInputStream.cpp --- contrib/zstd | 2 +- dbms/src/DataStreams/IProfilingBlockInputStream.cpp | 2 +- dbms/src/Interpreters/ProcessList.cpp | 2 +- dbms/src/Interpreters/ProcessList.h | 8 ++++---- .../queries/0_stateless/00600_replace_running_query.sh | 4 ++-- 5 files changed, 9 insertions(+), 9 deletions(-) diff --git a/contrib/zstd b/contrib/zstd index 255597502c3..f4340f46b23 160000 --- a/contrib/zstd +++ b/contrib/zstd @@ -1 +1 @@ -Subproject commit 255597502c3a4ef150abc964e376d4202a8c2929 +Subproject commit f4340f46b2387bc8de7d5320c0b83bb1499933ad diff --git a/dbms/src/DataStreams/IProfilingBlockInputStream.cpp b/dbms/src/DataStreams/IProfilingBlockInputStream.cpp index 8cb570bbf62..09eeff2225c 100644 --- a/dbms/src/DataStreams/IProfilingBlockInputStream.cpp +++ b/dbms/src/DataStreams/IProfilingBlockInputStream.cpp @@ -238,7 +238,7 @@ void IProfilingBlockInputStream::progressImpl(const Progress & value) if (process_list_elem) { if (!process_list_elem->updateProgressIn(value)) - cancel(false); + cancel(/* kill */ true); /// The total amount of data processed or intended for processing in all leaf sources, possibly on remote servers. diff --git a/dbms/src/Interpreters/ProcessList.cpp b/dbms/src/Interpreters/ProcessList.cpp index e6e77c85180..59c481e6e3a 100644 --- a/dbms/src/Interpreters/ProcessList.cpp +++ b/dbms/src/Interpreters/ProcessList.cpp @@ -66,7 +66,7 @@ ProcessList::EntryPtr ProcessList::insert( /// Ask queries to cancel. They will check this flag. for (auto it = range.first; it != range.second; ++it) - it->second->is_cancelled.store(true, std::memory_order_relaxed); + it->second->is_killed.store(true, std::memory_order_relaxed); } } } diff --git a/dbms/src/Interpreters/ProcessList.h b/dbms/src/Interpreters/ProcessList.h index a76e886414d..ecc29d671fe 100644 --- a/dbms/src/Interpreters/ProcessList.h +++ b/dbms/src/Interpreters/ProcessList.h @@ -78,7 +78,7 @@ private: CurrentMetrics::Increment num_queries {CurrentMetrics::Query}; - std::atomic is_cancelled { false }; + std::atomic is_killed { false }; /// Be careful using it. For example, queries field could be modified concurrently. const ProcessListForUser * user_process_list = nullptr; @@ -140,13 +140,13 @@ public: if (priority_handle) priority_handle->waitIfNeed(std::chrono::seconds(1)); /// NOTE Could make timeout customizable. - return !is_cancelled.load(std::memory_order_relaxed); + return !is_killed.load(std::memory_order_relaxed); } bool updateProgressOut(const Progress & value) { progress_out.incrementPiecewiseAtomically(value); - return !is_cancelled.load(std::memory_order_relaxed); + return !is_killed.load(std::memory_order_relaxed); } @@ -157,7 +157,7 @@ public: res.query = query; res.client_info = client_info; res.elapsed_seconds = watch.elapsedSeconds(); - res.is_cancelled = is_cancelled.load(std::memory_order_relaxed); + res.is_cancelled = is_killed.load(std::memory_order_relaxed); res.read_rows = progress_in.rows; res.read_bytes = progress_in.bytes; res.total_rows = progress_in.total_rows; diff --git a/dbms/tests/queries/0_stateless/00600_replace_running_query.sh b/dbms/tests/queries/0_stateless/00600_replace_running_query.sh index 37799069779..6778bbce149 100755 --- a/dbms/tests/queries/0_stateless/00600_replace_running_query.sh +++ b/dbms/tests/queries/0_stateless/00600_replace_running_query.sh @@ -5,7 +5,7 @@ CURDIR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd) set -e -o pipefail -$CLICKHOUSE_CURL -sS "$CLICKHOUSE_URL?query_id=hello&replace_running_query=1" -d 'SELECT sum(ignore(*)) FROM (SELECT number % 1000 AS k, groupArray(number) FROM numbers(100000000) GROUP BY k)' & +$CLICKHOUSE_CURL -sS "$CLICKHOUSE_URL?query_id=hello&replace_running_query=1" -d 'SELECT sum(ignore(*)) FROM (SELECT number % 1000 AS k, groupArray(number) FROM numbers(100000000) GROUP BY k)' 2>&1 > /dev/null & sleep 0.1 # First query (usually) should be received by the server after this sleep. -$CLICKHOUSE_CURL -sS "$CLICKHOUSE_URL?query_id=hello&replace_running_query=1" -d 'SELECT 1 WHERE 0' +$CLICKHOUSE_CURL -sS "$CLICKHOUSE_URL?query_id=hello&replace_running_query=1" -d 'SELECT 0' wait From 1e7d616a58ded7986c2d693f511fe890a9d16ac9 Mon Sep 17 00:00:00 2001 From: proller Date: Sat, 7 Apr 2018 14:01:00 +0300 Subject: [PATCH 21/21] Docker fixes (query compiler, compatible? package rename) --- docker/server/Dockerfile | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/docker/server/Dockerfile b/docker/server/Dockerfile index 20882fbeee3..1ee459693c0 100644 --- a/docker/server/Dockerfile +++ b/docker/server/Dockerfile @@ -1,14 +1,15 @@ -FROM ubuntu:16.04 +FROM ubuntu:17.10 ARG repository="deb http://repo.yandex.ru/clickhouse/deb/stable/ main/" ARG version=\* RUN apt-get update && \ - apt-get install -y apt-transport-https && \ + apt-get install -y apt-transport-https dirmngr && \ mkdir -p /etc/apt/sources.list.d && \ + apt-key adv --keyserver keyserver.ubuntu.com --recv E0C56BD4 && \ echo $repository | tee /etc/apt/sources.list.d/clickhouse.list && \ apt-get update && \ - apt-get install --allow-unauthenticated -y clickhouse-server=$version && \ + apt-get install --allow-unauthenticated -y "clickhouse-server|clickhouse-server-common=$version" libgcc-7-dev && \ rm -rf /var/lib/apt/lists/* /var/cache/debconf && \ apt-get clean