diff --git a/CHANGELOG.md b/CHANGELOG.md index fdb7f5303b1..1d3f9812214 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,4 +1,4 @@ -# ClickHouse 1.1.54370 Release Candidate, 2018-03-16 +# ClickHouse release 1.1.54370, 2018-03-16 ## New features: diff --git a/CHANGELOG_RU.md b/CHANGELOG_RU.md index 4e0b481e718..56d4bcd1cb7 100644 --- a/CHANGELOG_RU.md +++ b/CHANGELOG_RU.md @@ -1,4 +1,4 @@ -# ClickHouse 1.1.54370 Release Candidate, 2018-03-16 +# ClickHouse release 1.1.54370, 2018-03-16 ## Новые возможности: diff --git a/dbms/CMakeLists.txt b/dbms/CMakeLists.txt index e45a53427cb..8c823db478d 100644 --- a/dbms/CMakeLists.txt +++ b/dbms/CMakeLists.txt @@ -167,6 +167,8 @@ if (Poco_SQLODBC_FOUND) target_link_libraries (dbms ${Poco_SQLODBC_LIBRARY} ${Poco_SQL_LIBRARY}) target_include_directories (dbms PRIVATE ${ODBC_INCLUDE_DIRECTORIES} ${Poco_SQLODBC_INCLUDE_DIRS} PUBLIC ${Poco_SQL_INCLUDE_DIRS}) endif() +target_include_directories (clickhouse_common_io PRIVATE ${ClickHouse_SOURCE_DIR}/contrib/poco/Data/include) +target_include_directories (dbms PRIVATE ${ClickHouse_SOURCE_DIR}/contrib/poco/Data/include) if (Poco_DataODBC_FOUND) target_link_libraries (clickhouse_common_io ${Poco_Data_LIBRARY}) diff --git a/dbms/src/Client/Connection.cpp b/dbms/src/Client/Connection.cpp index 42cdfd59ffc..8c6d1d0e583 100644 --- a/dbms/src/Client/Connection.cpp +++ b/dbms/src/Client/Connection.cpp @@ -234,7 +234,7 @@ bool Connection::ping() { // LOG_TRACE(log_wrapper.get(), "Ping"); - TimeoutSetter timeout_setter(*socket, sync_request_timeout); + TimeoutSetter timeout_setter(*socket, sync_request_timeout, true); try { UInt64 pong = 0; @@ -274,7 +274,7 @@ TablesStatusResponse Connection::getTablesStatus(const TablesStatusRequest & req if (!connected) connect(); - TimeoutSetter timeout_setter(*socket, sync_request_timeout); + TimeoutSetter timeout_setter(*socket, sync_request_timeout, true); writeVarUInt(Protocol::Client::TablesStatusRequest, *out); request.write(*out, server_revision); diff --git a/dbms/src/Client/TimeoutSetter.h b/dbms/src/Client/TimeoutSetter.h index 0e908c1bdac..30ce28e889c 100644 --- a/dbms/src/Client/TimeoutSetter.h +++ b/dbms/src/Client/TimeoutSetter.h @@ -8,24 +8,25 @@ namespace DB { /// Temporarily overrides socket send/recieve timeouts and reset them back into destructor -/// Timeouts could be only decreased +/// If "limit_max_timeout" is true, timeouts could be only decreased (maxed by previous value). struct TimeoutSetter { - TimeoutSetter(Poco::Net::StreamSocket & socket_, const Poco::Timespan & send_timeout_, const Poco::Timespan & recieve_timeout_) + TimeoutSetter(Poco::Net::StreamSocket & socket_, const Poco::Timespan & send_timeout_, const Poco::Timespan & recieve_timeout_, + bool limit_max_timeout = false) : socket(socket_), send_timeout(send_timeout_), recieve_timeout(recieve_timeout_) { old_send_timeout = socket.getSendTimeout(); old_receive_timeout = socket.getReceiveTimeout(); - if (old_send_timeout > send_timeout) + if (!limit_max_timeout || old_send_timeout > send_timeout) socket.setSendTimeout(send_timeout); - if (old_receive_timeout > recieve_timeout) + if (!limit_max_timeout || old_receive_timeout > recieve_timeout) socket.setReceiveTimeout(recieve_timeout); } - TimeoutSetter(Poco::Net::StreamSocket & socket_, const Poco::Timespan & timeout_) - : TimeoutSetter(socket_, timeout_, timeout_) {} + TimeoutSetter(Poco::Net::StreamSocket & socket_, const Poco::Timespan & timeout_, bool limit_max_timeout = false) + : TimeoutSetter(socket_, timeout_, timeout_, limit_max_timeout) {} ~TimeoutSetter() { diff --git a/dbms/src/Common/ZooKeeper/ZooKeeper.cpp b/dbms/src/Common/ZooKeeper/ZooKeeper.cpp index 800bbc04a73..bccfd16b61c 100644 --- a/dbms/src/Common/ZooKeeper/ZooKeeper.cpp +++ b/dbms/src/Common/ZooKeeper/ZooKeeper.cpp @@ -12,7 +12,7 @@ #include #define ZOOKEEPER_CONNECTION_TIMEOUT_MS 1000 -#define ZOOKEEPER_OPERATION_TIMEOUT_MS 1000 +#define ZOOKEEPER_OPERATION_TIMEOUT_MS 10000 namespace DB @@ -49,6 +49,9 @@ void ZooKeeper::init(const std::string & hosts_, const std::string & identity_, session_timeout_ms = session_timeout_ms_; chroot = chroot_; + if (hosts.empty()) + throw KeeperException("No addresses passed to ZooKeeper constructor.", ZooKeeperImpl::ZooKeeper::ZBADARGUMENTS); + std::vector addresses_strings; boost::split(addresses_strings, hosts, boost::is_any_of(",")); ZooKeeperImpl::ZooKeeper::Addresses addresses; @@ -320,7 +323,7 @@ bool ZooKeeper::exists(const std::string & path, Stat * stat, const EventPtr & w return existsWatch(path, stat, callbackForEvent(watch)); } -bool ZooKeeper::existsWatch(const std::string & path, Stat * stat, const WatchCallback & watch_callback) +bool ZooKeeper::existsWatch(const std::string & path, Stat * stat, WatchCallback watch_callback) { int32_t code = existsImpl(path, stat, watch_callback); @@ -369,7 +372,7 @@ bool ZooKeeper::tryGet(const std::string & path, std::string & res, Stat * stat, return tryGetWatch(path, res, stat, callbackForEvent(watch), return_code); } -bool ZooKeeper::tryGetWatch(const std::string & path, std::string & res, Stat * stat, const WatchCallback & watch_callback, int * return_code) +bool ZooKeeper::tryGetWatch(const std::string & path, std::string & res, Stat * stat, WatchCallback watch_callback, int * return_code) { int32_t code = getImpl(path, res, stat, watch_callback); @@ -527,39 +530,53 @@ void ZooKeeper::tryRemoveRecursive(const std::string & path) } -void ZooKeeper::waitForDisappear(const std::string & path) +namespace { - while (true) + struct WaitForDisappearState { int32_t code = 0; int32_t event_type = 0; Poco::Event event; + }; + using WaitForDisappearStatePtr = std::shared_ptr; +} - auto callback = [&](const ZooKeeperImpl::ZooKeeper::ExistsResponse & response) +void ZooKeeper::waitForDisappear(const std::string & path) +{ + WaitForDisappearStatePtr state = std::make_shared(); + + while (true) + { + auto callback = [state](const ZooKeeperImpl::ZooKeeper::ExistsResponse & response) { - code = response.error; - if (code) - event.set(); + state->code = response.error; + if (state->code) + state->event.set(); }; - auto watch = [&](const ZooKeeperImpl::ZooKeeper::WatchResponse & response) + auto watch = [state](const ZooKeeperImpl::ZooKeeper::WatchResponse & response) { - code = response.error; - if (!code) - event_type = response.type; - event.set(); + if (!state->code) + { + state->code = response.error; + if (!state->code) + state->event_type = response.type; + state->event.set(); + } }; + /// NOTE: if the node doesn't exist, the watch will leak. + impl->exists(path, callback, watch); - event.wait(); + state->event.wait(); - if (code == ZooKeeperImpl::ZooKeeper::ZNONODE) + if (state->code == ZooKeeperImpl::ZooKeeper::ZNONODE) return; - if (code) - throw KeeperException(code, path); + if (state->code) + throw KeeperException(state->code, path); - if (event_type == ZooKeeperImpl::ZooKeeper::DELETED) + if (state->event_type == ZooKeeperImpl::ZooKeeper::DELETED) return; } } diff --git a/dbms/src/Common/ZooKeeper/ZooKeeper.h b/dbms/src/Common/ZooKeeper/ZooKeeper.h index 25b6f4a993a..e91bb20d877 100644 --- a/dbms/src/Common/ZooKeeper/ZooKeeper.h +++ b/dbms/src/Common/ZooKeeper/ZooKeeper.h @@ -110,7 +110,7 @@ public: int32_t tryRemove(const std::string & path, int32_t version = -1); bool exists(const std::string & path, Stat * stat = nullptr, const EventPtr & watch = nullptr); - bool existsWatch(const std::string & path, Stat * stat, const WatchCallback & watch_callback); + bool existsWatch(const std::string & path, Stat * stat, WatchCallback watch_callback); std::string get(const std::string & path, Stat * stat = nullptr, const EventPtr & watch = nullptr); @@ -118,7 +118,7 @@ public: /// * The node doesn't exist. Returns false in this case. bool tryGet(const std::string & path, std::string & res, Stat * stat = nullptr, const EventPtr & watch = nullptr, int * code = nullptr); - bool tryGetWatch(const std::string & path, std::string & res, Stat * stat, const WatchCallback & watch_callback, int * code = nullptr); + bool tryGetWatch(const std::string & path, std::string & res, Stat * stat, WatchCallback watch_callback, int * code = nullptr); void set(const std::string & path, const std::string & data, int32_t version = -1, Stat * stat = nullptr); diff --git a/dbms/src/Common/ZooKeeper/ZooKeeperImpl.cpp b/dbms/src/Common/ZooKeeper/ZooKeeperImpl.cpp index 0895e58c827..67160cc18c8 100644 --- a/dbms/src/Common/ZooKeeper/ZooKeeperImpl.cpp +++ b/dbms/src/Common/ZooKeeper/ZooKeeperImpl.cpp @@ -1,6 +1,7 @@ #include #include #include +#include #include #include @@ -505,7 +506,7 @@ ZooKeeper::ZooKeeper( Poco::Timespan operation_timeout) : root_path(root_path_), session_timeout(session_timeout), - operation_timeout(operation_timeout) + operation_timeout(std::min(operation_timeout, session_timeout)) { if (!root_path.empty()) { @@ -532,9 +533,6 @@ ZooKeeper::ZooKeeper( connect(addresses, connection_timeout); - sendHandshake(); - receiveHandshake(); - if (!auth_scheme.empty()) sendAuth(auth_scheme, auth_data); @@ -549,6 +547,9 @@ void ZooKeeper::connect( const Addresses & addresses, Poco::Timespan connection_timeout) { + if (addresses.empty()) + throw Exception("No addresses passed to ZooKeeperImpl constructor", ZBADARGUMENTS); + static constexpr size_t num_tries = 3; bool connected = false; @@ -559,13 +560,25 @@ void ZooKeeper::connect( { try { + socket = Poco::Net::StreamSocket(); /// Reset the state of previous attempt. socket.connect(address, connection_timeout); + + socket.setReceiveTimeout(operation_timeout); + socket.setSendTimeout(operation_timeout); + socket.setNoDelay(true); + + in.emplace(socket); + out.emplace(socket); + + sendHandshake(); + receiveHandshake(); + connected = true; break; } catch (const Poco::Net::NetException & e) { - fail_reasons << "\n" << getCurrentExceptionMessage(false); + fail_reasons << "\n" << getCurrentExceptionMessage(false) << ", " << address.toString(); } catch (const Poco::TimeoutException & e) { @@ -591,16 +604,9 @@ void ZooKeeper::connect( out << address.toString(); } - out << fail_reasons.str(); + out << fail_reasons.str() << "\n"; throw Exception(out.str(), ZCONNECTIONLOSS); } - - socket.setReceiveTimeout(operation_timeout); - socket.setSendTimeout(operation_timeout); - socket.setNoDelay(true); - - in.emplace(socket); - out.emplace(socket); } @@ -685,6 +691,8 @@ void ZooKeeper::sendAuth(const String & scheme, const String & data) void ZooKeeper::sendThread() { + setThreadName("ZooKeeperSend"); + auto prev_heartbeat_time = clock::now(); try @@ -703,12 +711,26 @@ void ZooKeeper::sendThread() std::chrono::duration_cast(next_heartbeat_time - now).count(), operation_timeout.totalMilliseconds()); - RequestPtr request; - if (requests.tryPop(request, max_wait)) + RequestInfo info; + if (requests_queue.tryPop(info, max_wait)) { - request->write(*out); + { + CurrentMetrics::add(CurrentMetrics::ZooKeeperRequest); + std::lock_guard lock(operations_mutex); + operations[info.request->xid] = info; + } - if (request->xid == close_xid) + if (info.watch) + { + info.request->has_watch = true; + CurrentMetrics::add(CurrentMetrics::ZooKeeperWatch); + std::lock_guard lock(watches_mutex); + watches[info.request->getPath()].emplace_back(std::move(info.watch)); + } + + info.request->write(*out); + + if (info.request->xid == close_xid) break; } } @@ -732,14 +754,31 @@ void ZooKeeper::sendThread() } /// Drain queue - RequestPtr request; - while (requests.tryPop(request)) - ; + RequestInfo info; + while (requests_queue.tryPop(info)) + { + if (info.callback) + { + ResponsePtr response = info.request->makeResponse(); + response->error = ZSESSIONEXPIRED; + info.callback(*response); + } + if (info.watch) + { + WatchResponse response; + response.type = SESSION; + response.state = EXPIRED_SESSION; + response.error = ZSESSIONEXPIRED; + info.watch(response); + } + } } void ZooKeeper::receiveThread() { + setThreadName("ZooKeeperRecv"); + try { Int64 waited = 0; @@ -970,6 +1009,8 @@ void ZooKeeper::finalize(bool error_send, bool error_receive) bool expired_prev = false; if (expired.compare_exchange_strong(expired_prev, true)) { + active_session_metric_increment.destroy(); + try { if (!error_send) @@ -992,7 +1033,7 @@ void ZooKeeper::finalize(bool error_send, bool error_receive) { RequestInfo & request_info = op.second; ResponsePtr response = request_info.request->makeResponse(); - response->error = ZCONNECTIONLOSS; + response->error = ZSESSIONEXPIRED; if (request_info.callback) request_info.callback(*response); } @@ -1009,7 +1050,7 @@ void ZooKeeper::finalize(bool error_send, bool error_receive) WatchResponse response; response.type = SESSION; response.state = EXPIRED_SESSION; - response.error = ZCONNECTIONLOSS; + response.error = ZSESSIONEXPIRED; for (auto & callback : path_watches.second) if (callback) @@ -1244,21 +1285,7 @@ void ZooKeeper::pushRequest(RequestInfo && info) ProfileEvents::increment(ProfileEvents::ZooKeeperTransactions); - { - CurrentMetrics::add(CurrentMetrics::ZooKeeperRequest); - std::lock_guard lock(operations_mutex); - operations[info.request->xid] = info; - } - - if (info.watch) - { - info.request->has_watch = true; - CurrentMetrics::add(CurrentMetrics::ZooKeeperWatch); - std::lock_guard lock(watches_mutex); - watches[info.request->getPath()].emplace_back(std::move(info.watch)); - } - - if (!requests.tryPush(info.request, operation_timeout.totalMilliseconds())) + if (!requests_queue.tryPush(std::move(info), operation_timeout.totalMilliseconds())) throw Exception("Cannot push request to queue within operation timeout", ZOPERATIONTIMEOUT); } diff --git a/dbms/src/Common/ZooKeeper/ZooKeeperImpl.h b/dbms/src/Common/ZooKeeper/ZooKeeperImpl.h index b3d8a057c49..46359a1baeb 100644 --- a/dbms/src/Common/ZooKeeper/ZooKeeperImpl.h +++ b/dbms/src/Common/ZooKeeper/ZooKeeperImpl.h @@ -24,6 +24,54 @@ #include +/** ZooKeeper C++ library, a replacement for libzookeeper. + * + * Motivation. + * + * libzookeeper has many bugs: + * - segfaults: for example, if zookeeper connection was interrupted while reading result of multi response; + * - memory corruption: for example, as a result of double free inside libzookeeper; + * - no timeouts for synchronous operations: they may stuck forever under simple Jepsen-like tests; + * - logical errors: for example, chroot prefix is not removed from the results of multi responses. + * - data races; + * + * The code of libzookeeper is over complicated: + * - memory ownership is unclear and bugs are very difficult to track and fix. + * - extremely creepy code for implementation of "chroot" feature. + * + * As of 2018, there are no active maintainers of libzookeeper: + * - bugs in JIRA are fixed only occasionaly with ad-hoc patches by library users. + * + * libzookeeper is a classical example of bad code written in C. + * + * In Go, Python and Rust programming languages, + * there are separate libraries for ZooKeeper, not based on libzookeeper. + * Motivation is almost the same. Example: + * https://github.com/python-zk/kazoo/blob/master/docs/implementation.rst + * + * About "session restore" feature. + * + * libzookeeper has the feature of session restore. Client receives session id and session token from the server, + * and when connection is lost, it can quickly reconnect to any server with the same session id and token, + * to continue with existing session. + * libzookeeper performs this reconnection automatically. + * + * This feature is proven to be harmful. + * For example, it makes very difficult to correctly remove ephemeral nodes. + * This may lead to weird bugs in application code. + * For example, our developers have found that type of bugs in Curator Java library. + * + * On the other side, session restore feature has no advantages, + * because every application should be able to establish new session and reinitialize internal state, + * when the session is lost and cannot be restored. + * + * This library never restores the session. In case of any error, the session is considered as expired + * and you should create a new instance of ZooKeeper object and reinitialize the application state. + * + * This library is not intended to be CPU efficient. Hundreds of thousands operations per second is usually enough. + */ + + namespace CurrentMetrics { extern const Metric ZooKeeperSession; @@ -61,7 +109,7 @@ public: * - you provide callbacks for your commands; callbacks are invoked in internal thread and must be cheap: * for example, just signal a condvar / fulfull a promise. * - you also may provide callbacks for watches; they are also invoked in internal thread and must be cheap. - * - whenever you receive SessionExpired exception of method isValid returns false, + * - whenever you receive exception with ZSESSIONEXPIRED code or method isExpired returns true, * the ZooKeeper instance is no longer usable - you may only destroy it and probably create another. * - whenever session is expired or ZooKeeper instance is destroying, all callbacks are notified with special event. * - data for callbacks must be alive when ZooKeeper instance is alive. @@ -391,6 +439,8 @@ public: using CheckCallback = std::function; using MultiCallback = std::function; + /// If the method will throw exception, callbacks won't be called. + /// After the method is executed successfully, you must wait for callbacks. void create( const String & path, @@ -525,9 +575,9 @@ private: clock::time_point time; }; - using RequestsQueue = ConcurrentBoundedQueue; + using RequestsQueue = ConcurrentBoundedQueue; - RequestsQueue requests{1}; + RequestsQueue requests_queue{1}; void pushRequest(RequestInfo && request); using Operations = std::map; @@ -571,7 +621,7 @@ private: template void read(T &); - CurrentMetrics::Increment metric_increment{CurrentMetrics::ZooKeeperSession}; + CurrentMetrics::Increment active_session_metric_increment{CurrentMetrics::ZooKeeperSession}; }; }; diff --git a/dbms/src/DataStreams/IProfilingBlockInputStream.cpp b/dbms/src/DataStreams/IProfilingBlockInputStream.cpp index f7558dbf3eb..8cb570bbf62 100644 --- a/dbms/src/DataStreams/IProfilingBlockInputStream.cpp +++ b/dbms/src/DataStreams/IProfilingBlockInputStream.cpp @@ -339,6 +339,21 @@ void IProfilingBlockInputStream::cancel(bool kill) } +bool IProfilingBlockInputStream::isCancelled() const +{ + return is_cancelled; +} + +bool IProfilingBlockInputStream::isCancelledOrThrowIfKilled() const +{ + if (!is_cancelled) + return false; + if (is_killed) + throw Exception("Query was cancelled", ErrorCodes::QUERY_WAS_CANCELLED); + return true; +} + + void IProfilingBlockInputStream::setProgressCallback(const ProgressCallback & callback) { progress_callback = callback; diff --git a/dbms/src/DataStreams/IProfilingBlockInputStream.h b/dbms/src/DataStreams/IProfilingBlockInputStream.h index 0bed471f245..a9601d5c265 100644 --- a/dbms/src/DataStreams/IProfilingBlockInputStream.h +++ b/dbms/src/DataStreams/IProfilingBlockInputStream.h @@ -119,21 +119,8 @@ public: */ virtual void cancel(bool kill); - /** Do you want to abort the receipt of data. - */ - bool isCancelled() const - { - return is_cancelled.load(std::memory_order_seq_cst); - } - - bool isCancelledOrThrowIfKilled() const - { - if (!isCancelled()) - return false; - if (is_killed) - throw Exception("Query was cancelled", ErrorCodes::QUERY_WAS_CANCELLED); - return true; - } + bool isCancelled() const; + bool isCancelledOrThrowIfKilled() const; /** What limitations and quotas should be checked. * LIMITS_CURRENT - checks amount of data read by current stream only (BlockStreamProfileInfo is used for check). @@ -189,7 +176,7 @@ public: protected: BlockStreamProfileInfo info; std::atomic is_cancelled{false}; - bool is_killed{false}; + std::atomic is_killed{false}; ProgressCallback progress_callback; ProcessListElement * process_list_elem = nullptr; diff --git a/dbms/src/DataStreams/RemoteBlockOutputStream.cpp b/dbms/src/DataStreams/RemoteBlockOutputStream.cpp index 7464e94e4be..51fb62ef5a1 100644 --- a/dbms/src/DataStreams/RemoteBlockOutputStream.cpp +++ b/dbms/src/DataStreams/RemoteBlockOutputStream.cpp @@ -47,7 +47,27 @@ RemoteBlockOutputStream::RemoteBlockOutputStream(Connection & connection_, const void RemoteBlockOutputStream::write(const Block & block) { assertBlocksHaveEqualStructure(block, header, "RemoteBlockOutputStream"); - connection.sendData(block); + + try + { + connection.sendData(block); + } + catch (const NetException & e) + { + /// Try to get more detailed exception from server + if (connection.poll(0)) + { + Connection::Packet packet = connection.receivePacket(); + + if (Protocol::Server::Exception == packet.type) + { + packet.exception->rethrow(); + return; + } + } + + throw; + } } diff --git a/dbms/src/Interpreters/ExpressionAnalyzer.cpp b/dbms/src/Interpreters/ExpressionAnalyzer.cpp index 2b84a413778..757aeecffa0 100644 --- a/dbms/src/Interpreters/ExpressionAnalyzer.cpp +++ b/dbms/src/Interpreters/ExpressionAnalyzer.cpp @@ -513,6 +513,7 @@ void ExpressionAnalyzer::analyzeAggregation() { getRootActions(select_query->array_join_expression_list(), true, false, temp_actions); addMultipleArrayJoinAction(temp_actions); + array_join_columns = temp_actions->getSampleBlock().getNamesAndTypesList(); } if (select_query) @@ -1520,7 +1521,8 @@ void ExpressionAnalyzer::makeSetsForIndexImpl(const ASTPtr & node, const Block & else { NamesAndTypesList temp_columns = source_columns; - temp_columns.insert(temp_columns.end(), aggregated_columns.begin(), aggregated_columns.end()); + temp_columns.insert(temp_columns.end(), array_join_columns.begin(), array_join_columns.end()); + temp_columns.insert(temp_columns.end(), columns_added_by_join.begin(), columns_added_by_join.end()); ExpressionActionsPtr temp_actions = std::make_shared(temp_columns, settings); getRootActions(func->arguments->children.at(0), true, false, temp_actions); diff --git a/dbms/src/Interpreters/ExpressionAnalyzer.h b/dbms/src/Interpreters/ExpressionAnalyzer.h index 3422506c144..e50ae568ad0 100644 --- a/dbms/src/Interpreters/ExpressionAnalyzer.h +++ b/dbms/src/Interpreters/ExpressionAnalyzer.h @@ -159,6 +159,8 @@ private: /// Columns after ARRAY JOIN, JOIN, and/or aggregation. NamesAndTypesList aggregated_columns; + NamesAndTypesList array_join_columns; + /// The main table in FROM clause, if exists. StoragePtr storage; diff --git a/dbms/src/Parsers/ASTSelectWithUnionQuery.cpp b/dbms/src/Parsers/ASTSelectWithUnionQuery.cpp index 041f9bce8d5..82b7dcf2cd5 100644 --- a/dbms/src/Parsers/ASTSelectWithUnionQuery.cpp +++ b/dbms/src/Parsers/ASTSelectWithUnionQuery.cpp @@ -26,7 +26,10 @@ void ASTSelectWithUnionQuery::formatQueryImpl(const FormatSettings & settings, F for (ASTs::const_iterator it = list_of_selects->children.begin(); it != list_of_selects->children.end(); ++it) { if (it != list_of_selects->children.begin()) - settings.ostr << settings.nl_or_ws << indent_str << hilite_keyword << "UNION ALL" << hilite_none << settings.nl_or_ws; + settings.ostr + << settings.nl_or_ws << indent_str << (settings.hilite ? hilite_keyword : "") + << "UNION ALL" << (settings.hilite ? hilite_keyword : "") + << settings.nl_or_ws; (*it)->formatImpl(settings, state, frame); } diff --git a/dbms/src/Server/TCPHandler.cpp b/dbms/src/Server/TCPHandler.cpp index 72caa5ffe53..53ca6c8699f 100644 --- a/dbms/src/Server/TCPHandler.cpp +++ b/dbms/src/Server/TCPHandler.cpp @@ -134,6 +134,7 @@ void TCPHandler::runImpl() * The client will be able to accept it, if it did not happen while sending another packet and the client has not disconnected yet. */ std::unique_ptr exception; + bool network_error = false; try { @@ -183,6 +184,10 @@ void TCPHandler::runImpl() if (e.code() == ErrorCodes::UNKNOWN_PACKET_FROM_CLIENT) throw; + + /// If a timeout occurred, try to inform client about it and close the session + if (e.code() == ErrorCodes::SOCKET_TIMEOUT) + network_error = true; } catch (const Poco::Net::NetException & e) { @@ -211,8 +216,6 @@ void TCPHandler::runImpl() exception = std::make_unique("Unknown exception", ErrorCodes::UNKNOWN_EXCEPTION); } - bool network_error = false; - try { if (exception) @@ -251,6 +254,14 @@ void TCPHandler::runImpl() void TCPHandler::readData(const Settings & global_settings) { + auto receive_timeout = query_context.getSettingsRef().receive_timeout.value; + + /// Poll interval should not be greater than receive_timeout + size_t default_poll_interval = global_settings.poll_interval.value * 1000000; + size_t current_poll_interval = static_cast(receive_timeout.totalMicroseconds()); + constexpr size_t min_poll_interval = 5000; // 5 ms + size_t poll_interval = std::max(min_poll_interval, std::min(default_poll_interval, current_poll_interval)); + while (1) { Stopwatch watch(CLOCK_MONOTONIC_COARSE); @@ -258,7 +269,7 @@ void TCPHandler::readData(const Settings & global_settings) /// We are waiting for a packet from the client. Thus, every `POLL_INTERVAL` seconds check whether we need to shut down. while (1) { - if (static_cast(*in).poll(global_settings.poll_interval * 1000000)) + if (static_cast(*in).poll(poll_interval)) break; /// Do we need to shut down? @@ -269,8 +280,16 @@ void TCPHandler::readData(const Settings & global_settings) * If we periodically poll, the receive_timeout of the socket itself does not work. * Therefore, an additional check is added. */ - if (watch.elapsedSeconds() > global_settings.receive_timeout.totalSeconds()) - throw Exception("Timeout exceeded while receiving data from client", ErrorCodes::SOCKET_TIMEOUT); + double elapsed = watch.elapsedSeconds(); + if (elapsed > receive_timeout.totalSeconds()) + { + std::stringstream ss; + ss << "Timeout exceeded while receiving data from client."; + ss << " Waited for " << static_cast(elapsed) << " seconds,"; + ss << " timeout is " << receive_timeout.totalSeconds() << " seconds."; + + throw Exception(ss.str(), ErrorCodes::SOCKET_TIMEOUT); + } } /// If client disconnected. @@ -560,7 +579,7 @@ bool TCPHandler::receivePacket() return false; default: - throw Exception("Unknown packet from client", ErrorCodes::UNKNOWN_PACKET_FROM_CLIENT); + throw Exception("Unknown packet " + toString(packet_type) + " from client", ErrorCodes::UNKNOWN_PACKET_FROM_CLIENT); } } diff --git a/dbms/src/Server/config.d/zookeeper.xml b/dbms/src/Server/config.d/zookeeper.xml index d390a935107..095f4be78c1 100644 --- a/dbms/src/Server/config.d/zookeeper.xml +++ b/dbms/src/Server/config.d/zookeeper.xml @@ -4,5 +4,13 @@ localhost 2181 + + yandex.ru + 2181 + + + 111.0.1.2 + 2181 + diff --git a/dbms/src/Storages/MergeTree/MergeTreeData.cpp b/dbms/src/Storages/MergeTree/MergeTreeData.cpp index 19daa71d3e3..fd78090c9ec 100644 --- a/dbms/src/Storages/MergeTree/MergeTreeData.cpp +++ b/dbms/src/Storages/MergeTree/MergeTreeData.cpp @@ -2172,7 +2172,7 @@ MergeTreeData::DataPartsVector MergeTreeData::Transaction::commit() return total_covered_parts; } -bool MergeTreeData::isPrimaryKeyColumnPossiblyWrappedInFunctions(const ASTPtr & node) const +bool MergeTreeData::isPrimaryKeyOrPartitionKeyColumnPossiblyWrappedInFunctions(const ASTPtr & node) const { String column_name = node->getColumnName(); @@ -2180,9 +2180,12 @@ bool MergeTreeData::isPrimaryKeyColumnPossiblyWrappedInFunctions(const ASTPtr & if (column_name == column.column_name) return true; + if (partition_expr_ast && partition_expr_ast->children.at(0)->getColumnName() == column_name) + return true; + if (const ASTFunction * func = typeid_cast(node.get())) if (func->arguments->children.size() == 1) - return isPrimaryKeyColumnPossiblyWrappedInFunctions(func->arguments->children.front()); + return isPrimaryKeyOrPartitionKeyColumnPossiblyWrappedInFunctions(func->arguments->children.front()); return false; } @@ -2195,15 +2198,15 @@ bool MergeTreeData::mayBenefitFromIndexForIn(const ASTPtr & left_in_operand) con if (left_in_operand_tuple && left_in_operand_tuple->name == "tuple") { for (const auto & item : left_in_operand_tuple->arguments->children) - if (isPrimaryKeyColumnPossiblyWrappedInFunctions(item)) + if (isPrimaryKeyOrPartitionKeyColumnPossiblyWrappedInFunctions(item)) return true; /// The tuple itself may be part of the primary key, so check that as a last resort. - return isPrimaryKeyColumnPossiblyWrappedInFunctions(left_in_operand); + return isPrimaryKeyOrPartitionKeyColumnPossiblyWrappedInFunctions(left_in_operand); } else { - return isPrimaryKeyColumnPossiblyWrappedInFunctions(left_in_operand); + return isPrimaryKeyOrPartitionKeyColumnPossiblyWrappedInFunctions(left_in_operand); } } diff --git a/dbms/src/Storages/MergeTree/MergeTreeData.h b/dbms/src/Storages/MergeTree/MergeTreeData.h index f412419459c..d0b47b095d3 100644 --- a/dbms/src/Storages/MergeTree/MergeTreeData.h +++ b/dbms/src/Storages/MergeTree/MergeTreeData.h @@ -652,7 +652,7 @@ private: std::lock_guard & data_parts_lock) const; /// Checks whether the column is in the primary key, possibly wrapped in a chain of functions with single argument. - bool isPrimaryKeyColumnPossiblyWrappedInFunctions(const ASTPtr &node) const; + bool isPrimaryKeyOrPartitionKeyColumnPossiblyWrappedInFunctions(const ASTPtr & node) const; }; } diff --git a/dbms/src/Storages/MergeTree/PKCondition.cpp b/dbms/src/Storages/MergeTree/PKCondition.cpp index 1d80ea38a87..11bffdace85 100644 --- a/dbms/src/Storages/MergeTree/PKCondition.cpp +++ b/dbms/src/Storages/MergeTree/PKCondition.cpp @@ -1006,6 +1006,10 @@ bool PKCondition::mayBeTrueInRangeImpl(const std::vector & key_ranges, co rpn_stack.back() = !rpn_stack.back(); } } + else + { + throw Exception("Set for IN is not created yet!", ErrorCodes::LOGICAL_ERROR); + } } else if (element.function == RPNElement::FUNCTION_NOT) { diff --git a/dbms/tests/queries/0_stateless/00167_shard_settings_inside_query.reference b/dbms/tests/queries/0_stateless/00167_settings_inside_query.reference similarity index 57% rename from dbms/tests/queries/0_stateless/00167_shard_settings_inside_query.reference rename to dbms/tests/queries/0_stateless/00167_settings_inside_query.reference index cd62bbbf596..3e67fe1ac4f 100644 --- a/dbms/tests/queries/0_stateless/00167_shard_settings_inside_query.reference +++ b/dbms/tests/queries/0_stateless/00167_settings_inside_query.reference @@ -1,4 +1,2 @@ 123 123 -61 -62 diff --git a/dbms/tests/queries/0_stateless/00167_shard_settings_inside_query.sql b/dbms/tests/queries/0_stateless/00167_settings_inside_query.sql similarity index 57% rename from dbms/tests/queries/0_stateless/00167_shard_settings_inside_query.sql rename to dbms/tests/queries/0_stateless/00167_settings_inside_query.sql index 6a892987e39..987a9475b8d 100644 --- a/dbms/tests/queries/0_stateless/00167_shard_settings_inside_query.sql +++ b/dbms/tests/queries/0_stateless/00167_settings_inside_query.sql @@ -1,3 +1,2 @@ SELECT min(number) FROM system.numbers WHERE toUInt64(number % 1000) IN (SELECT DISTINCT blockSize() FROM system.numbers SETTINGS max_block_size = 123, max_rows_to_read = 1000, read_overflow_mode = 'break') SETTINGS max_rows_to_read = 1000000, read_overflow_mode = 'break'; SELECT * FROM (SELECT DISTINCT blockSize() AS x FROM system.numbers SETTINGS max_block_size = 123, max_rows_to_read = 1000, read_overflow_mode = 'break'); -SELECT x FROM (SELECT DISTINCT blockSize() AS x FROM remote('127.0.0.{2,3}', system.numbers) WHERE number IN (SELECT number * 2 FROM system.numbers SETTINGS max_rows_to_read = 10000, read_overflow_mode = 'break') SETTINGS max_block_size = 123, max_rows_to_read = 1000, read_overflow_mode = 'break') ORDER BY x; diff --git a/dbms/tests/queries/0_stateless/00534_long_functions_bad_arguments8.reference b/dbms/tests/queries/0_stateless/00534_long_functions_bad_arguments8.reference index 7193c3d3f3d..0d72c2d7fe0 100644 --- a/dbms/tests/queries/0_stateless/00534_long_functions_bad_arguments8.reference +++ b/dbms/tests/queries/0_stateless/00534_long_functions_bad_arguments8.reference @@ -1 +1,3 @@ Still alive +Still alive +Still alive diff --git a/dbms/tests/queries/0_stateless/00534_long_functions_bad_arguments9.reference b/dbms/tests/queries/0_stateless/00534_long_functions_bad_arguments9.reference index 7193c3d3f3d..0d72c2d7fe0 100644 --- a/dbms/tests/queries/0_stateless/00534_long_functions_bad_arguments9.reference +++ b/dbms/tests/queries/0_stateless/00534_long_functions_bad_arguments9.reference @@ -1 +1,3 @@ Still alive +Still alive +Still alive diff --git a/dbms/tests/queries/0_stateless/00612_pk_in_tuple.reference b/dbms/tests/queries/0_stateless/00612_pk_in_tuple.reference index de69348862a..351e10ca3ff 100644 --- a/dbms/tests/queries/0_stateless/00612_pk_in_tuple.reference +++ b/dbms/tests/queries/0_stateless/00612_pk_in_tuple.reference @@ -11,3 +11,27 @@ key, arrayJoin(arr) in ((1, 1), (2, 2)) (key, left array join arr) in ((1, 1), (2, 2)) 1 2 +all +1 [1] +2 [2] +key, arrayJoin(n.x) in (1, 1) +1 1 +key, arrayJoin(n.x) in ((1, 1), (2, 2)) +1 1 +2 2 +(key, left array join n.x) in (1, 1) +1 +(key, left array join n.x) in ((1, 1), (2, 2)) +1 +2 +max(key) from tab where (key, left array join n.x) in (1, 1) +1 +1 +max(key) from tab where (key, left array join n.x) in ((1, 1), (2, 2)) +2 +2 +max(key) from tab any left join (select key, arrayJoin(n.x) as val from tab) using key where (key, val) in (1, 1) +1 +max(key) from tab any left join (select key, arrayJoin(n.x) as val from tab) using key where (key, val) in ((1, 1), (2, 2)) +2 +1 diff --git a/dbms/tests/queries/0_stateless/00612_pk_in_tuple.sql b/dbms/tests/queries/0_stateless/00612_pk_in_tuple.sql index f6e34909cae..54a38c5fcca 100644 --- a/dbms/tests/queries/0_stateless/00612_pk_in_tuple.sql +++ b/dbms/tests/queries/0_stateless/00612_pk_in_tuple.sql @@ -14,3 +14,32 @@ select key from test.tab left array join arr as val where (key, val) in (1, 1); select '(key, left array join arr) in ((1, 1), (2, 2))'; select key from test.tab left array join arr as val where (key, val) in ((1, 1), (2, 2)) order by key; +drop table if exists test.tab; +create table test.tab (key UInt64, n Nested(x UInt64)) Engine = MergeTree order by key; +insert into test.tab values (1, [1]); +insert into test.tab values (2, [2]); +select 'all'; +select * from test.tab order by key; +select 'key, arrayJoin(n.x) in (1, 1)'; +select key, arrayJoin(n.x) as val from test.tab where (key, val) in (1, 1); +select 'key, arrayJoin(n.x) in ((1, 1), (2, 2))'; +select key, arrayJoin(n.x) as val from test.tab where (key, val) in ((1, 1), (2, 2)) order by key; +select '(key, left array join n.x) in (1, 1)'; +select key from test.tab left array join n.x as val where (key, val) in (1, 1); +select '(key, left array join n.x) in ((1, 1), (2, 2))'; +select key from test.tab left array join n.x as val where (key, val) in ((1, 1), (2, 2)) order by key; +select 'max(key) from tab where (key, left array join n.x) in (1, 1)'; +select max(key) from test.tab left array join `n.x` as val where (key, val) in ((1, 1)); +select max(key) from test.tab left array join n as val where (key, val.x) in (1, 1); +select 'max(key) from tab where (key, left array join n.x) in ((1, 1), (2, 2))'; +select max(key) from test.tab left array join `n.x` as val where (key, val) in ((1, 1), (2, 2)); +select max(key) from test.tab left array join n as val where (key, val.x) in ((1, 1), (2, 2)); +select 'max(key) from tab any left join (select key, arrayJoin(n.x) as val from tab) using key where (key, val) in (1, 1)'; +select max(key) from test.tab any left join (select key, arrayJoin(n.x) as val from test.tab) using key where (key, val) in (1, 1); +select 'max(key) from tab any left join (select key, arrayJoin(n.x) as val from tab) using key where (key, val) in ((1, 1), (2, 2))'; +select max(key) from test.tab any left join (select key, arrayJoin(n.x) as val from test.tab) using key where (key, val) in ((1, 1), (2, 2)); + +drop table if exists test.tab; +CREATE TABLE test.tab (key1 Int32, id1 Int64, c1 Int64) ENGINE = MergeTree PARTITION BY id1 ORDER BY (key1) ; +insert into test.tab values ( -1, 1, 0 ); +SELECT count(*) FROM test.tab PREWHERE id1 IN (1);