Merge remote-tracking branch 'upstream/master' into fix3

This commit is contained in:
proller 2018-04-05 13:07:08 +03:00
commit 96feeea402
26 changed files with 320 additions and 106 deletions

View File

@ -1,4 +1,4 @@
# ClickHouse 1.1.54370 Release Candidate, 2018-03-16
# ClickHouse release 1.1.54370, 2018-03-16
## New features:

View File

@ -1,4 +1,4 @@
# ClickHouse 1.1.54370 Release Candidate, 2018-03-16
# ClickHouse release 1.1.54370, 2018-03-16
## Новые возможности:

View File

@ -167,6 +167,8 @@ if (Poco_SQLODBC_FOUND)
target_link_libraries (dbms ${Poco_SQLODBC_LIBRARY} ${Poco_SQL_LIBRARY})
target_include_directories (dbms PRIVATE ${ODBC_INCLUDE_DIRECTORIES} ${Poco_SQLODBC_INCLUDE_DIRS} PUBLIC ${Poco_SQL_INCLUDE_DIRS})
endif()
target_include_directories (clickhouse_common_io PRIVATE ${ClickHouse_SOURCE_DIR}/contrib/poco/Data/include)
target_include_directories (dbms PRIVATE ${ClickHouse_SOURCE_DIR}/contrib/poco/Data/include)
if (Poco_DataODBC_FOUND)
target_link_libraries (clickhouse_common_io ${Poco_Data_LIBRARY})

View File

@ -234,7 +234,7 @@ bool Connection::ping()
{
// LOG_TRACE(log_wrapper.get(), "Ping");
TimeoutSetter timeout_setter(*socket, sync_request_timeout);
TimeoutSetter timeout_setter(*socket, sync_request_timeout, true);
try
{
UInt64 pong = 0;
@ -274,7 +274,7 @@ TablesStatusResponse Connection::getTablesStatus(const TablesStatusRequest & req
if (!connected)
connect();
TimeoutSetter timeout_setter(*socket, sync_request_timeout);
TimeoutSetter timeout_setter(*socket, sync_request_timeout, true);
writeVarUInt(Protocol::Client::TablesStatusRequest, *out);
request.write(*out, server_revision);

View File

@ -8,24 +8,25 @@ namespace DB
{
/// Temporarily overrides socket send/recieve timeouts and reset them back into destructor
/// Timeouts could be only decreased
/// If "limit_max_timeout" is true, timeouts could be only decreased (maxed by previous value).
struct TimeoutSetter
{
TimeoutSetter(Poco::Net::StreamSocket & socket_, const Poco::Timespan & send_timeout_, const Poco::Timespan & recieve_timeout_)
TimeoutSetter(Poco::Net::StreamSocket & socket_, const Poco::Timespan & send_timeout_, const Poco::Timespan & recieve_timeout_,
bool limit_max_timeout = false)
: socket(socket_), send_timeout(send_timeout_), recieve_timeout(recieve_timeout_)
{
old_send_timeout = socket.getSendTimeout();
old_receive_timeout = socket.getReceiveTimeout();
if (old_send_timeout > send_timeout)
if (!limit_max_timeout || old_send_timeout > send_timeout)
socket.setSendTimeout(send_timeout);
if (old_receive_timeout > recieve_timeout)
if (!limit_max_timeout || old_receive_timeout > recieve_timeout)
socket.setReceiveTimeout(recieve_timeout);
}
TimeoutSetter(Poco::Net::StreamSocket & socket_, const Poco::Timespan & timeout_)
: TimeoutSetter(socket_, timeout_, timeout_) {}
TimeoutSetter(Poco::Net::StreamSocket & socket_, const Poco::Timespan & timeout_, bool limit_max_timeout = false)
: TimeoutSetter(socket_, timeout_, timeout_, limit_max_timeout) {}
~TimeoutSetter()
{

View File

@ -12,7 +12,7 @@
#include <Common/randomSeed.h>
#define ZOOKEEPER_CONNECTION_TIMEOUT_MS 1000
#define ZOOKEEPER_OPERATION_TIMEOUT_MS 1000
#define ZOOKEEPER_OPERATION_TIMEOUT_MS 10000
namespace DB
@ -49,6 +49,9 @@ void ZooKeeper::init(const std::string & hosts_, const std::string & identity_,
session_timeout_ms = session_timeout_ms_;
chroot = chroot_;
if (hosts.empty())
throw KeeperException("No addresses passed to ZooKeeper constructor.", ZooKeeperImpl::ZooKeeper::ZBADARGUMENTS);
std::vector<std::string> addresses_strings;
boost::split(addresses_strings, hosts, boost::is_any_of(","));
ZooKeeperImpl::ZooKeeper::Addresses addresses;
@ -320,7 +323,7 @@ bool ZooKeeper::exists(const std::string & path, Stat * stat, const EventPtr & w
return existsWatch(path, stat, callbackForEvent(watch));
}
bool ZooKeeper::existsWatch(const std::string & path, Stat * stat, const WatchCallback & watch_callback)
bool ZooKeeper::existsWatch(const std::string & path, Stat * stat, WatchCallback watch_callback)
{
int32_t code = existsImpl(path, stat, watch_callback);
@ -369,7 +372,7 @@ bool ZooKeeper::tryGet(const std::string & path, std::string & res, Stat * stat,
return tryGetWatch(path, res, stat, callbackForEvent(watch), return_code);
}
bool ZooKeeper::tryGetWatch(const std::string & path, std::string & res, Stat * stat, const WatchCallback & watch_callback, int * return_code)
bool ZooKeeper::tryGetWatch(const std::string & path, std::string & res, Stat * stat, WatchCallback watch_callback, int * return_code)
{
int32_t code = getImpl(path, res, stat, watch_callback);
@ -527,39 +530,53 @@ void ZooKeeper::tryRemoveRecursive(const std::string & path)
}
void ZooKeeper::waitForDisappear(const std::string & path)
namespace
{
while (true)
struct WaitForDisappearState
{
int32_t code = 0;
int32_t event_type = 0;
Poco::Event event;
};
using WaitForDisappearStatePtr = std::shared_ptr<WaitForDisappearState>;
}
auto callback = [&](const ZooKeeperImpl::ZooKeeper::ExistsResponse & response)
void ZooKeeper::waitForDisappear(const std::string & path)
{
WaitForDisappearStatePtr state = std::make_shared<WaitForDisappearState>();
while (true)
{
auto callback = [state](const ZooKeeperImpl::ZooKeeper::ExistsResponse & response)
{
code = response.error;
if (code)
event.set();
state->code = response.error;
if (state->code)
state->event.set();
};
auto watch = [&](const ZooKeeperImpl::ZooKeeper::WatchResponse & response)
auto watch = [state](const ZooKeeperImpl::ZooKeeper::WatchResponse & response)
{
code = response.error;
if (!code)
event_type = response.type;
event.set();
if (!state->code)
{
state->code = response.error;
if (!state->code)
state->event_type = response.type;
state->event.set();
}
};
/// NOTE: if the node doesn't exist, the watch will leak.
impl->exists(path, callback, watch);
event.wait();
state->event.wait();
if (code == ZooKeeperImpl::ZooKeeper::ZNONODE)
if (state->code == ZooKeeperImpl::ZooKeeper::ZNONODE)
return;
if (code)
throw KeeperException(code, path);
if (state->code)
throw KeeperException(state->code, path);
if (event_type == ZooKeeperImpl::ZooKeeper::DELETED)
if (state->event_type == ZooKeeperImpl::ZooKeeper::DELETED)
return;
}
}

View File

@ -110,7 +110,7 @@ public:
int32_t tryRemove(const std::string & path, int32_t version = -1);
bool exists(const std::string & path, Stat * stat = nullptr, const EventPtr & watch = nullptr);
bool existsWatch(const std::string & path, Stat * stat, const WatchCallback & watch_callback);
bool existsWatch(const std::string & path, Stat * stat, WatchCallback watch_callback);
std::string get(const std::string & path, Stat * stat = nullptr, const EventPtr & watch = nullptr);
@ -118,7 +118,7 @@ public:
/// * The node doesn't exist. Returns false in this case.
bool tryGet(const std::string & path, std::string & res, Stat * stat = nullptr, const EventPtr & watch = nullptr, int * code = nullptr);
bool tryGetWatch(const std::string & path, std::string & res, Stat * stat, const WatchCallback & watch_callback, int * code = nullptr);
bool tryGetWatch(const std::string & path, std::string & res, Stat * stat, WatchCallback watch_callback, int * code = nullptr);
void set(const std::string & path, const std::string & data,
int32_t version = -1, Stat * stat = nullptr);

View File

@ -1,6 +1,7 @@
#include <Common/ZooKeeper/ZooKeeperImpl.h>
#include <Common/Exception.h>
#include <Common/ProfileEvents.h>
#include <Common/setThreadName.h>
#include <Common/typeid_cast.h>
#include <IO/WriteHelpers.h>
@ -505,7 +506,7 @@ ZooKeeper::ZooKeeper(
Poco::Timespan operation_timeout)
: root_path(root_path_),
session_timeout(session_timeout),
operation_timeout(operation_timeout)
operation_timeout(std::min(operation_timeout, session_timeout))
{
if (!root_path.empty())
{
@ -532,9 +533,6 @@ ZooKeeper::ZooKeeper(
connect(addresses, connection_timeout);
sendHandshake();
receiveHandshake();
if (!auth_scheme.empty())
sendAuth(auth_scheme, auth_data);
@ -549,6 +547,9 @@ void ZooKeeper::connect(
const Addresses & addresses,
Poco::Timespan connection_timeout)
{
if (addresses.empty())
throw Exception("No addresses passed to ZooKeeperImpl constructor", ZBADARGUMENTS);
static constexpr size_t num_tries = 3;
bool connected = false;
@ -559,13 +560,25 @@ void ZooKeeper::connect(
{
try
{
socket = Poco::Net::StreamSocket(); /// Reset the state of previous attempt.
socket.connect(address, connection_timeout);
socket.setReceiveTimeout(operation_timeout);
socket.setSendTimeout(operation_timeout);
socket.setNoDelay(true);
in.emplace(socket);
out.emplace(socket);
sendHandshake();
receiveHandshake();
connected = true;
break;
}
catch (const Poco::Net::NetException & e)
{
fail_reasons << "\n" << getCurrentExceptionMessage(false);
fail_reasons << "\n" << getCurrentExceptionMessage(false) << ", " << address.toString();
}
catch (const Poco::TimeoutException & e)
{
@ -591,16 +604,9 @@ void ZooKeeper::connect(
out << address.toString();
}
out << fail_reasons.str();
out << fail_reasons.str() << "\n";
throw Exception(out.str(), ZCONNECTIONLOSS);
}
socket.setReceiveTimeout(operation_timeout);
socket.setSendTimeout(operation_timeout);
socket.setNoDelay(true);
in.emplace(socket);
out.emplace(socket);
}
@ -685,6 +691,8 @@ void ZooKeeper::sendAuth(const String & scheme, const String & data)
void ZooKeeper::sendThread()
{
setThreadName("ZooKeeperSend");
auto prev_heartbeat_time = clock::now();
try
@ -703,12 +711,26 @@ void ZooKeeper::sendThread()
std::chrono::duration_cast<std::chrono::milliseconds>(next_heartbeat_time - now).count(),
operation_timeout.totalMilliseconds());
RequestPtr request;
if (requests.tryPop(request, max_wait))
RequestInfo info;
if (requests_queue.tryPop(info, max_wait))
{
request->write(*out);
{
CurrentMetrics::add(CurrentMetrics::ZooKeeperRequest);
std::lock_guard lock(operations_mutex);
operations[info.request->xid] = info;
}
if (request->xid == close_xid)
if (info.watch)
{
info.request->has_watch = true;
CurrentMetrics::add(CurrentMetrics::ZooKeeperWatch);
std::lock_guard lock(watches_mutex);
watches[info.request->getPath()].emplace_back(std::move(info.watch));
}
info.request->write(*out);
if (info.request->xid == close_xid)
break;
}
}
@ -732,14 +754,31 @@ void ZooKeeper::sendThread()
}
/// Drain queue
RequestPtr request;
while (requests.tryPop(request))
;
RequestInfo info;
while (requests_queue.tryPop(info))
{
if (info.callback)
{
ResponsePtr response = info.request->makeResponse();
response->error = ZSESSIONEXPIRED;
info.callback(*response);
}
if (info.watch)
{
WatchResponse response;
response.type = SESSION;
response.state = EXPIRED_SESSION;
response.error = ZSESSIONEXPIRED;
info.watch(response);
}
}
}
void ZooKeeper::receiveThread()
{
setThreadName("ZooKeeperRecv");
try
{
Int64 waited = 0;
@ -970,6 +1009,8 @@ void ZooKeeper::finalize(bool error_send, bool error_receive)
bool expired_prev = false;
if (expired.compare_exchange_strong(expired_prev, true))
{
active_session_metric_increment.destroy();
try
{
if (!error_send)
@ -992,7 +1033,7 @@ void ZooKeeper::finalize(bool error_send, bool error_receive)
{
RequestInfo & request_info = op.second;
ResponsePtr response = request_info.request->makeResponse();
response->error = ZCONNECTIONLOSS;
response->error = ZSESSIONEXPIRED;
if (request_info.callback)
request_info.callback(*response);
}
@ -1009,7 +1050,7 @@ void ZooKeeper::finalize(bool error_send, bool error_receive)
WatchResponse response;
response.type = SESSION;
response.state = EXPIRED_SESSION;
response.error = ZCONNECTIONLOSS;
response.error = ZSESSIONEXPIRED;
for (auto & callback : path_watches.second)
if (callback)
@ -1244,21 +1285,7 @@ void ZooKeeper::pushRequest(RequestInfo && info)
ProfileEvents::increment(ProfileEvents::ZooKeeperTransactions);
{
CurrentMetrics::add(CurrentMetrics::ZooKeeperRequest);
std::lock_guard lock(operations_mutex);
operations[info.request->xid] = info;
}
if (info.watch)
{
info.request->has_watch = true;
CurrentMetrics::add(CurrentMetrics::ZooKeeperWatch);
std::lock_guard lock(watches_mutex);
watches[info.request->getPath()].emplace_back(std::move(info.watch));
}
if (!requests.tryPush(info.request, operation_timeout.totalMilliseconds()))
if (!requests_queue.tryPush(std::move(info), operation_timeout.totalMilliseconds()))
throw Exception("Cannot push request to queue within operation timeout", ZOPERATIONTIMEOUT);
}

View File

@ -24,6 +24,54 @@
#include <functional>
/** ZooKeeper C++ library, a replacement for libzookeeper.
*
* Motivation.
*
* libzookeeper has many bugs:
* - segfaults: for example, if zookeeper connection was interrupted while reading result of multi response;
* - memory corruption: for example, as a result of double free inside libzookeeper;
* - no timeouts for synchronous operations: they may stuck forever under simple Jepsen-like tests;
* - logical errors: for example, chroot prefix is not removed from the results of multi responses.
* - data races;
*
* The code of libzookeeper is over complicated:
* - memory ownership is unclear and bugs are very difficult to track and fix.
* - extremely creepy code for implementation of "chroot" feature.
*
* As of 2018, there are no active maintainers of libzookeeper:
* - bugs in JIRA are fixed only occasionaly with ad-hoc patches by library users.
*
* libzookeeper is a classical example of bad code written in C.
*
* In Go, Python and Rust programming languages,
* there are separate libraries for ZooKeeper, not based on libzookeeper.
* Motivation is almost the same. Example:
* https://github.com/python-zk/kazoo/blob/master/docs/implementation.rst
*
* About "session restore" feature.
*
* libzookeeper has the feature of session restore. Client receives session id and session token from the server,
* and when connection is lost, it can quickly reconnect to any server with the same session id and token,
* to continue with existing session.
* libzookeeper performs this reconnection automatically.
*
* This feature is proven to be harmful.
* For example, it makes very difficult to correctly remove ephemeral nodes.
* This may lead to weird bugs in application code.
* For example, our developers have found that type of bugs in Curator Java library.
*
* On the other side, session restore feature has no advantages,
* because every application should be able to establish new session and reinitialize internal state,
* when the session is lost and cannot be restored.
*
* This library never restores the session. In case of any error, the session is considered as expired
* and you should create a new instance of ZooKeeper object and reinitialize the application state.
*
* This library is not intended to be CPU efficient. Hundreds of thousands operations per second is usually enough.
*/
namespace CurrentMetrics
{
extern const Metric ZooKeeperSession;
@ -61,7 +109,7 @@ public:
* - you provide callbacks for your commands; callbacks are invoked in internal thread and must be cheap:
* for example, just signal a condvar / fulfull a promise.
* - you also may provide callbacks for watches; they are also invoked in internal thread and must be cheap.
* - whenever you receive SessionExpired exception of method isValid returns false,
* - whenever you receive exception with ZSESSIONEXPIRED code or method isExpired returns true,
* the ZooKeeper instance is no longer usable - you may only destroy it and probably create another.
* - whenever session is expired or ZooKeeper instance is destroying, all callbacks are notified with special event.
* - data for callbacks must be alive when ZooKeeper instance is alive.
@ -391,6 +439,8 @@ public:
using CheckCallback = std::function<void(const CheckResponse &)>;
using MultiCallback = std::function<void(const MultiResponse &)>;
/// If the method will throw exception, callbacks won't be called.
/// After the method is executed successfully, you must wait for callbacks.
void create(
const String & path,
@ -525,9 +575,9 @@ private:
clock::time_point time;
};
using RequestsQueue = ConcurrentBoundedQueue<RequestPtr>;
using RequestsQueue = ConcurrentBoundedQueue<RequestInfo>;
RequestsQueue requests{1};
RequestsQueue requests_queue{1};
void pushRequest(RequestInfo && request);
using Operations = std::map<XID, RequestInfo>;
@ -571,7 +621,7 @@ private:
template <typename T>
void read(T &);
CurrentMetrics::Increment metric_increment{CurrentMetrics::ZooKeeperSession};
CurrentMetrics::Increment active_session_metric_increment{CurrentMetrics::ZooKeeperSession};
};
};

View File

@ -339,6 +339,21 @@ void IProfilingBlockInputStream::cancel(bool kill)
}
bool IProfilingBlockInputStream::isCancelled() const
{
return is_cancelled;
}
bool IProfilingBlockInputStream::isCancelledOrThrowIfKilled() const
{
if (!is_cancelled)
return false;
if (is_killed)
throw Exception("Query was cancelled", ErrorCodes::QUERY_WAS_CANCELLED);
return true;
}
void IProfilingBlockInputStream::setProgressCallback(const ProgressCallback & callback)
{
progress_callback = callback;

View File

@ -119,21 +119,8 @@ public:
*/
virtual void cancel(bool kill);
/** Do you want to abort the receipt of data.
*/
bool isCancelled() const
{
return is_cancelled.load(std::memory_order_seq_cst);
}
bool isCancelledOrThrowIfKilled() const
{
if (!isCancelled())
return false;
if (is_killed)
throw Exception("Query was cancelled", ErrorCodes::QUERY_WAS_CANCELLED);
return true;
}
bool isCancelled() const;
bool isCancelledOrThrowIfKilled() const;
/** What limitations and quotas should be checked.
* LIMITS_CURRENT - checks amount of data read by current stream only (BlockStreamProfileInfo is used for check).
@ -189,7 +176,7 @@ public:
protected:
BlockStreamProfileInfo info;
std::atomic<bool> is_cancelled{false};
bool is_killed{false};
std::atomic<bool> is_killed{false};
ProgressCallback progress_callback;
ProcessListElement * process_list_elem = nullptr;

View File

@ -47,7 +47,27 @@ RemoteBlockOutputStream::RemoteBlockOutputStream(Connection & connection_, const
void RemoteBlockOutputStream::write(const Block & block)
{
assertBlocksHaveEqualStructure(block, header, "RemoteBlockOutputStream");
connection.sendData(block);
try
{
connection.sendData(block);
}
catch (const NetException & e)
{
/// Try to get more detailed exception from server
if (connection.poll(0))
{
Connection::Packet packet = connection.receivePacket();
if (Protocol::Server::Exception == packet.type)
{
packet.exception->rethrow();
return;
}
}
throw;
}
}

View File

@ -513,6 +513,7 @@ void ExpressionAnalyzer::analyzeAggregation()
{
getRootActions(select_query->array_join_expression_list(), true, false, temp_actions);
addMultipleArrayJoinAction(temp_actions);
array_join_columns = temp_actions->getSampleBlock().getNamesAndTypesList();
}
if (select_query)
@ -1520,7 +1521,8 @@ void ExpressionAnalyzer::makeSetsForIndexImpl(const ASTPtr & node, const Block &
else
{
NamesAndTypesList temp_columns = source_columns;
temp_columns.insert(temp_columns.end(), aggregated_columns.begin(), aggregated_columns.end());
temp_columns.insert(temp_columns.end(), array_join_columns.begin(), array_join_columns.end());
temp_columns.insert(temp_columns.end(), columns_added_by_join.begin(), columns_added_by_join.end());
ExpressionActionsPtr temp_actions = std::make_shared<ExpressionActions>(temp_columns, settings);
getRootActions(func->arguments->children.at(0), true, false, temp_actions);

View File

@ -159,6 +159,8 @@ private:
/// Columns after ARRAY JOIN, JOIN, and/or aggregation.
NamesAndTypesList aggregated_columns;
NamesAndTypesList array_join_columns;
/// The main table in FROM clause, if exists.
StoragePtr storage;

View File

@ -26,7 +26,10 @@ void ASTSelectWithUnionQuery::formatQueryImpl(const FormatSettings & settings, F
for (ASTs::const_iterator it = list_of_selects->children.begin(); it != list_of_selects->children.end(); ++it)
{
if (it != list_of_selects->children.begin())
settings.ostr << settings.nl_or_ws << indent_str << hilite_keyword << "UNION ALL" << hilite_none << settings.nl_or_ws;
settings.ostr
<< settings.nl_or_ws << indent_str << (settings.hilite ? hilite_keyword : "")
<< "UNION ALL" << (settings.hilite ? hilite_keyword : "")
<< settings.nl_or_ws;
(*it)->formatImpl(settings, state, frame);
}

View File

@ -134,6 +134,7 @@ void TCPHandler::runImpl()
* The client will be able to accept it, if it did not happen while sending another packet and the client has not disconnected yet.
*/
std::unique_ptr<Exception> exception;
bool network_error = false;
try
{
@ -183,6 +184,10 @@ void TCPHandler::runImpl()
if (e.code() == ErrorCodes::UNKNOWN_PACKET_FROM_CLIENT)
throw;
/// If a timeout occurred, try to inform client about it and close the session
if (e.code() == ErrorCodes::SOCKET_TIMEOUT)
network_error = true;
}
catch (const Poco::Net::NetException & e)
{
@ -211,8 +216,6 @@ void TCPHandler::runImpl()
exception = std::make_unique<Exception>("Unknown exception", ErrorCodes::UNKNOWN_EXCEPTION);
}
bool network_error = false;
try
{
if (exception)
@ -251,6 +254,14 @@ void TCPHandler::runImpl()
void TCPHandler::readData(const Settings & global_settings)
{
auto receive_timeout = query_context.getSettingsRef().receive_timeout.value;
/// Poll interval should not be greater than receive_timeout
size_t default_poll_interval = global_settings.poll_interval.value * 1000000;
size_t current_poll_interval = static_cast<size_t>(receive_timeout.totalMicroseconds());
constexpr size_t min_poll_interval = 5000; // 5 ms
size_t poll_interval = std::max(min_poll_interval, std::min(default_poll_interval, current_poll_interval));
while (1)
{
Stopwatch watch(CLOCK_MONOTONIC_COARSE);
@ -258,7 +269,7 @@ void TCPHandler::readData(const Settings & global_settings)
/// We are waiting for a packet from the client. Thus, every `POLL_INTERVAL` seconds check whether we need to shut down.
while (1)
{
if (static_cast<ReadBufferFromPocoSocket &>(*in).poll(global_settings.poll_interval * 1000000))
if (static_cast<ReadBufferFromPocoSocket &>(*in).poll(poll_interval))
break;
/// Do we need to shut down?
@ -269,8 +280,16 @@ void TCPHandler::readData(const Settings & global_settings)
* If we periodically poll, the receive_timeout of the socket itself does not work.
* Therefore, an additional check is added.
*/
if (watch.elapsedSeconds() > global_settings.receive_timeout.totalSeconds())
throw Exception("Timeout exceeded while receiving data from client", ErrorCodes::SOCKET_TIMEOUT);
double elapsed = watch.elapsedSeconds();
if (elapsed > receive_timeout.totalSeconds())
{
std::stringstream ss;
ss << "Timeout exceeded while receiving data from client.";
ss << " Waited for " << static_cast<size_t>(elapsed) << " seconds,";
ss << " timeout is " << receive_timeout.totalSeconds() << " seconds.";
throw Exception(ss.str(), ErrorCodes::SOCKET_TIMEOUT);
}
}
/// If client disconnected.
@ -560,7 +579,7 @@ bool TCPHandler::receivePacket()
return false;
default:
throw Exception("Unknown packet from client", ErrorCodes::UNKNOWN_PACKET_FROM_CLIENT);
throw Exception("Unknown packet " + toString(packet_type) + " from client", ErrorCodes::UNKNOWN_PACKET_FROM_CLIENT);
}
}

View File

@ -4,5 +4,13 @@
<host>localhost</host>
<port>2181</port>
</node>
<node>
<host>yandex.ru</host>
<port>2181</port>
</node>
<node>
<host>111.0.1.2</host>
<port>2181</port>
</node>
</zookeeper>
</yandex>

View File

@ -2172,7 +2172,7 @@ MergeTreeData::DataPartsVector MergeTreeData::Transaction::commit()
return total_covered_parts;
}
bool MergeTreeData::isPrimaryKeyColumnPossiblyWrappedInFunctions(const ASTPtr & node) const
bool MergeTreeData::isPrimaryKeyOrPartitionKeyColumnPossiblyWrappedInFunctions(const ASTPtr & node) const
{
String column_name = node->getColumnName();
@ -2180,9 +2180,12 @@ bool MergeTreeData::isPrimaryKeyColumnPossiblyWrappedInFunctions(const ASTPtr &
if (column_name == column.column_name)
return true;
if (partition_expr_ast && partition_expr_ast->children.at(0)->getColumnName() == column_name)
return true;
if (const ASTFunction * func = typeid_cast<const ASTFunction *>(node.get()))
if (func->arguments->children.size() == 1)
return isPrimaryKeyColumnPossiblyWrappedInFunctions(func->arguments->children.front());
return isPrimaryKeyOrPartitionKeyColumnPossiblyWrappedInFunctions(func->arguments->children.front());
return false;
}
@ -2195,15 +2198,15 @@ bool MergeTreeData::mayBenefitFromIndexForIn(const ASTPtr & left_in_operand) con
if (left_in_operand_tuple && left_in_operand_tuple->name == "tuple")
{
for (const auto & item : left_in_operand_tuple->arguments->children)
if (isPrimaryKeyColumnPossiblyWrappedInFunctions(item))
if (isPrimaryKeyOrPartitionKeyColumnPossiblyWrappedInFunctions(item))
return true;
/// The tuple itself may be part of the primary key, so check that as a last resort.
return isPrimaryKeyColumnPossiblyWrappedInFunctions(left_in_operand);
return isPrimaryKeyOrPartitionKeyColumnPossiblyWrappedInFunctions(left_in_operand);
}
else
{
return isPrimaryKeyColumnPossiblyWrappedInFunctions(left_in_operand);
return isPrimaryKeyOrPartitionKeyColumnPossiblyWrappedInFunctions(left_in_operand);
}
}

View File

@ -652,7 +652,7 @@ private:
std::lock_guard<std::mutex> & data_parts_lock) const;
/// Checks whether the column is in the primary key, possibly wrapped in a chain of functions with single argument.
bool isPrimaryKeyColumnPossiblyWrappedInFunctions(const ASTPtr &node) const;
bool isPrimaryKeyOrPartitionKeyColumnPossiblyWrappedInFunctions(const ASTPtr & node) const;
};
}

View File

@ -1006,6 +1006,10 @@ bool PKCondition::mayBeTrueInRangeImpl(const std::vector<Range> & key_ranges, co
rpn_stack.back() = !rpn_stack.back();
}
}
else
{
throw Exception("Set for IN is not created yet!", ErrorCodes::LOGICAL_ERROR);
}
}
else if (element.function == RPNElement::FUNCTION_NOT)
{

View File

@ -1,3 +1,2 @@
SELECT min(number) FROM system.numbers WHERE toUInt64(number % 1000) IN (SELECT DISTINCT blockSize() FROM system.numbers SETTINGS max_block_size = 123, max_rows_to_read = 1000, read_overflow_mode = 'break') SETTINGS max_rows_to_read = 1000000, read_overflow_mode = 'break';
SELECT * FROM (SELECT DISTINCT blockSize() AS x FROM system.numbers SETTINGS max_block_size = 123, max_rows_to_read = 1000, read_overflow_mode = 'break');
SELECT x FROM (SELECT DISTINCT blockSize() AS x FROM remote('127.0.0.{2,3}', system.numbers) WHERE number IN (SELECT number * 2 FROM system.numbers SETTINGS max_rows_to_read = 10000, read_overflow_mode = 'break') SETTINGS max_block_size = 123, max_rows_to_read = 1000, read_overflow_mode = 'break') ORDER BY x;

View File

@ -1 +1,3 @@
Still alive
Still alive
Still alive

View File

@ -1 +1,3 @@
Still alive
Still alive
Still alive

View File

@ -11,3 +11,27 @@ key, arrayJoin(arr) in ((1, 1), (2, 2))
(key, left array join arr) in ((1, 1), (2, 2))
1
2
all
1 [1]
2 [2]
key, arrayJoin(n.x) in (1, 1)
1 1
key, arrayJoin(n.x) in ((1, 1), (2, 2))
1 1
2 2
(key, left array join n.x) in (1, 1)
1
(key, left array join n.x) in ((1, 1), (2, 2))
1
2
max(key) from tab where (key, left array join n.x) in (1, 1)
1
1
max(key) from tab where (key, left array join n.x) in ((1, 1), (2, 2))
2
2
max(key) from tab any left join (select key, arrayJoin(n.x) as val from tab) using key where (key, val) in (1, 1)
1
max(key) from tab any left join (select key, arrayJoin(n.x) as val from tab) using key where (key, val) in ((1, 1), (2, 2))
2
1

View File

@ -14,3 +14,32 @@ select key from test.tab left array join arr as val where (key, val) in (1, 1);
select '(key, left array join arr) in ((1, 1), (2, 2))';
select key from test.tab left array join arr as val where (key, val) in ((1, 1), (2, 2)) order by key;
drop table if exists test.tab;
create table test.tab (key UInt64, n Nested(x UInt64)) Engine = MergeTree order by key;
insert into test.tab values (1, [1]);
insert into test.tab values (2, [2]);
select 'all';
select * from test.tab order by key;
select 'key, arrayJoin(n.x) in (1, 1)';
select key, arrayJoin(n.x) as val from test.tab where (key, val) in (1, 1);
select 'key, arrayJoin(n.x) in ((1, 1), (2, 2))';
select key, arrayJoin(n.x) as val from test.tab where (key, val) in ((1, 1), (2, 2)) order by key;
select '(key, left array join n.x) in (1, 1)';
select key from test.tab left array join n.x as val where (key, val) in (1, 1);
select '(key, left array join n.x) in ((1, 1), (2, 2))';
select key from test.tab left array join n.x as val where (key, val) in ((1, 1), (2, 2)) order by key;
select 'max(key) from tab where (key, left array join n.x) in (1, 1)';
select max(key) from test.tab left array join `n.x` as val where (key, val) in ((1, 1));
select max(key) from test.tab left array join n as val where (key, val.x) in (1, 1);
select 'max(key) from tab where (key, left array join n.x) in ((1, 1), (2, 2))';
select max(key) from test.tab left array join `n.x` as val where (key, val) in ((1, 1), (2, 2));
select max(key) from test.tab left array join n as val where (key, val.x) in ((1, 1), (2, 2));
select 'max(key) from tab any left join (select key, arrayJoin(n.x) as val from tab) using key where (key, val) in (1, 1)';
select max(key) from test.tab any left join (select key, arrayJoin(n.x) as val from test.tab) using key where (key, val) in (1, 1);
select 'max(key) from tab any left join (select key, arrayJoin(n.x) as val from tab) using key where (key, val) in ((1, 1), (2, 2))';
select max(key) from test.tab any left join (select key, arrayJoin(n.x) as val from test.tab) using key where (key, val) in ((1, 1), (2, 2));
drop table if exists test.tab;
CREATE TABLE test.tab (key1 Int32, id1 Int64, c1 Int64) ENGINE = MergeTree PARTITION BY id1 ORDER BY (key1) ;
insert into test.tab values ( -1, 1, 0 );
SELECT count(*) FROM test.tab PREWHERE id1 IN (1);