diff --git a/programs/client/Client.cpp b/programs/client/Client.cpp index 9a8b580407a..8d3a1ba7c74 100644 --- a/programs/client/Client.cpp +++ b/programs/client/Client.cpp @@ -1900,6 +1900,9 @@ private: switch (packet.type) { + case Protocol::Server::PartUUIDs: + return true; + case Protocol::Server::Data: if (!cancelled) onData(packet.block); diff --git a/src/Client/Connection.cpp b/src/Client/Connection.cpp index 65b15a46955..e38a6b240a6 100644 --- a/src/Client/Connection.cpp +++ b/src/Client/Connection.cpp @@ -542,6 +542,12 @@ void Connection::sendData(const Block & block, const String & name, bool scalar) throttler->add(out->count() - prev_bytes); } +void Connection::sendIgnoredPartUUIDs(const std::vector & uuids) +{ + writeVarUInt(Protocol::Client::IgnoredPartUUIDs, *out); + writeVectorBinary(uuids, *out); + out->next(); +} void Connection::sendPreparedData(ReadBuffer & input, size_t size, const String & name) { @@ -798,6 +804,10 @@ Packet Connection::receivePacket(std::function async_ case Protocol::Server::EndOfStream: return res; + case Protocol::Server::PartUUIDs: + readVectorBinary(res.part_uuids, *in); + return res; + default: /// In unknown state, disconnect - to not leave unsynchronised connection. disconnect(); diff --git a/src/Client/Connection.h b/src/Client/Connection.h index 83e8f3ba206..2d24b143d7a 100644 --- a/src/Client/Connection.h +++ b/src/Client/Connection.h @@ -66,6 +66,7 @@ struct Packet std::vector multistring_message; Progress progress; BlockStreamProfileInfo profile_info; + std::vector part_uuids; Packet() : type(Protocol::Server::Hello) {} }; @@ -157,6 +158,8 @@ public: void sendScalarsData(Scalars & data); /// Send all contents of external (temporary) tables. void sendExternalTablesData(ExternalTablesData & data); + /// Send parts' uuids to excluded them from query processing + void sendIgnoredPartUUIDs(const std::vector & uuids); /// Send prepared block of data (serialized and, if need, compressed), that will be read from 'input'. /// You could pass size of serialized/compressed block. diff --git a/src/Client/MultiplexedConnections.cpp b/src/Client/MultiplexedConnections.cpp index ed7aad0a515..c50dd7b6454 100644 --- a/src/Client/MultiplexedConnections.cpp +++ b/src/Client/MultiplexedConnections.cpp @@ -140,6 +140,21 @@ void MultiplexedConnections::sendQuery( sent_query = true; } +void MultiplexedConnections::sendIgnoredPartUUIDs(const std::vector & uuids) +{ + std::lock_guard lock(cancel_mutex); + + if (sent_query) + throw Exception("Cannot send uuids after query is sent.", ErrorCodes::LOGICAL_ERROR); + + for (ReplicaState & state : replica_states) + { + Connection * connection = state.connection; + if (connection != nullptr) + connection->sendIgnoredPartUUIDs(uuids); + } +} + Packet MultiplexedConnections::receivePacket() { std::lock_guard lock(cancel_mutex); @@ -195,6 +210,7 @@ Packet MultiplexedConnections::drain() switch (packet.type) { + case Protocol::Server::PartUUIDs: case Protocol::Server::Data: case Protocol::Server::Progress: case Protocol::Server::ProfileInfo: @@ -253,6 +269,7 @@ Packet MultiplexedConnections::receivePacketUnlocked(std::function & uuids); + /** On each replica, read and skip all packets to EndOfStream or Exception. * Returns EndOfStream if no exception has been received. Otherwise * returns the last received packet of type Exception. diff --git a/src/Columns/ColumnsNumber.h b/src/Columns/ColumnsNumber.h index 96ce2bd6d6f..17a28e617c3 100644 --- a/src/Columns/ColumnsNumber.h +++ b/src/Columns/ColumnsNumber.h @@ -26,4 +26,6 @@ using ColumnInt256 = ColumnVector; using ColumnFloat32 = ColumnVector; using ColumnFloat64 = ColumnVector; +using ColumnUUID = ColumnVector; + } diff --git a/src/Common/ErrorCodes.cpp b/src/Common/ErrorCodes.cpp index a2cd65137c0..09e5945f2b5 100644 --- a/src/Common/ErrorCodes.cpp +++ b/src/Common/ErrorCodes.cpp @@ -533,6 +533,7 @@ M(564, INTERSERVER_SCHEME_DOESNT_MATCH) \ M(565, TOO_MANY_PARTITIONS) \ M(566, CANNOT_RMDIR) \ + M(567, DUPLICATED_PART_UUIDS) \ \ M(999, KEEPER_EXCEPTION) \ M(1000, POCO_EXCEPTION) \ diff --git a/src/Core/Protocol.h b/src/Core/Protocol.h index f383e509751..df51a0cb61a 100644 --- a/src/Core/Protocol.h +++ b/src/Core/Protocol.h @@ -75,8 +75,9 @@ namespace Protocol TablesStatusResponse = 9, /// A response to TablesStatus request. Log = 10, /// System logs of the query execution TableColumns = 11, /// Columns' description for default values calculation + PartUUIDs = 12, /// List of unique parts ids. - MAX = TableColumns, + MAX = PartUUIDs, }; /// NOTE: If the type of packet argument would be Enum, the comparison packet >= 0 && packet < 10 @@ -98,6 +99,7 @@ namespace Protocol "TablesStatusResponse", "Log", "TableColumns", + "PartUUIDs", }; return packet <= MAX ? data[packet] @@ -132,8 +134,9 @@ namespace Protocol TablesStatusRequest = 5, /// Check status of tables on the server. KeepAlive = 6, /// Keep the connection alive Scalar = 7, /// A block of data (compressed or not). + IgnoredPartUUIDs = 8, /// List of unique parts ids to exclude from query processing - MAX = Scalar, + MAX = IgnoredPartUUIDs, }; inline const char * toString(UInt64 packet) @@ -147,6 +150,7 @@ namespace Protocol "TablesStatusRequest", "KeepAlive", "Scalar", + "IgnoredPartUUIDs", }; return packet <= MAX ? data[packet] diff --git a/src/Core/Settings.h b/src/Core/Settings.h index c4cf3803913..ecd3fa9e746 100644 --- a/src/Core/Settings.h +++ b/src/Core/Settings.h @@ -420,6 +420,8 @@ class IColumn; M(Bool, async_socket_for_remote, true, "Asynchronously read from socket executing remote query", 0) \ \ M(Bool, optimize_rewrite_sum_if_to_count_if, true, "Rewrite sumIf() and sum(if()) function countIf() function when logically equivalent", 0) \ + M(Bool, allow_experimental_query_deduplication, false, "Allow sending parts' UUIDs for a query in order to deduplicate data parts if any", 0) \ + \ /** Obsolete settings that do nothing but left for compatibility reasons. Remove each one after half a year of obsolescence. */ \ \ M(UInt64, max_memory_usage_for_all_queries, 0, "Obsolete. Will be removed after 2020-10-20", 0) \ diff --git a/src/DataStreams/RemoteQueryExecutor.cpp b/src/DataStreams/RemoteQueryExecutor.cpp index 14e51ffefdf..ce7db264eef 100644 --- a/src/DataStreams/RemoteQueryExecutor.cpp +++ b/src/DataStreams/RemoteQueryExecutor.cpp @@ -13,6 +13,7 @@ #include #include #include +#include namespace DB { @@ -20,6 +21,7 @@ namespace DB namespace ErrorCodes { extern const int UNKNOWN_PACKET_FROM_SERVER; + extern const int DUPLICATED_PART_UUIDS; } RemoteQueryExecutor::RemoteQueryExecutor( @@ -158,6 +160,7 @@ void RemoteQueryExecutor::sendQuery() std::lock_guard guard(was_cancelled_mutex); established = true; + was_cancelled = false; auto timeouts = ConnectionTimeouts::getTCPTimeoutsWithFailover(settings); ClientInfo modified_client_info = context.getClientInfo(); @@ -167,6 +170,14 @@ void RemoteQueryExecutor::sendQuery() modified_client_info.client_trace_context = CurrentThread::get().thread_trace_context; } + { + std::lock_guard lock(duplicated_part_uuids_mutex); + if (!duplicated_part_uuids.empty()) + { + multiplexed_connections->sendIgnoredPartUUIDs(duplicated_part_uuids); + } + } + multiplexed_connections->sendQuery(timeouts, query, query_id, stage, modified_client_info, true); established = false; @@ -195,7 +206,29 @@ Block RemoteQueryExecutor::read() Packet packet = multiplexed_connections->receivePacket(); if (auto block = processPacket(std::move(packet))) + { + if (got_duplicated_part_uuids) + { + /// Cancel previous query and disconnect before retry. + cancel(); + multiplexed_connections->disconnect(); + + /// Only resend once, otherwise throw an exception + if (!resent_query) + { + if (log) + LOG_DEBUG(log, "Found duplicate UUIDs, will retry query without those parts"); + + resent_query = true; + sent_query = false; + got_duplicated_part_uuids = false; + /// Consecutive read will implicitly send query first. + return read(); + } + throw Exception("Found duplicate uuids while processing query.", ErrorCodes::DUPLICATED_PART_UUIDS); + } return *block; + } } } @@ -233,7 +266,29 @@ std::variant RemoteQueryExecutor::read(std::unique_ptr else { if (auto data = processPacket(std::move(read_context->packet))) + { + if (got_duplicated_part_uuids) + { + /// Cancel previous query and disconnect before retry. + cancel(); + multiplexed_connections->disconnect(); + + /// Only resend once, otherwise throw an exception + if (!resent_query) + { + if (log) + LOG_DEBUG(log, "Found duplicate UUIDs, will retry query without those parts"); + + resent_query = true; + sent_query = false; + got_duplicated_part_uuids = false; + /// Consecutive read will implicitly send query first. + return read(read_context); + } + throw Exception("Found duplicate uuids while processing query.", ErrorCodes::DUPLICATED_PART_UUIDS); + } return std::move(*data); + } } } while (true); @@ -246,6 +301,13 @@ std::optional RemoteQueryExecutor::processPacket(Packet packet) { switch (packet.type) { + case Protocol::Server::PartUUIDs: + if (!setPartUUIDs(packet.part_uuids)) + { + got_duplicated_part_uuids = true; + return Block(); + } + break; case Protocol::Server::Data: /// If the block is not empty and is not a header block if (packet.block && (packet.block.rows() > 0)) @@ -306,6 +368,20 @@ std::optional RemoteQueryExecutor::processPacket(Packet packet) return {}; } +bool RemoteQueryExecutor::setPartUUIDs(const std::vector & uuids) +{ + Context & query_context = const_cast(context).getQueryContext(); + auto duplicates = query_context.getPartUUIDs()->add(uuids); + + if (!duplicates.empty()) + { + std::lock_guard lock(duplicated_part_uuids_mutex); + duplicated_part_uuids.insert(duplicated_part_uuids.begin(), duplicates.begin(), duplicates.end()); + return false; + } + return true; +} + void RemoteQueryExecutor::finish(std::unique_ptr * read_context) { /** If one of: @@ -383,6 +459,7 @@ void RemoteQueryExecutor::sendExternalTables() { std::lock_guard lock(external_tables_mutex); + external_tables_data.clear(); external_tables_data.reserve(count); for (size_t i = 0; i < count; ++i) @@ -446,7 +523,7 @@ bool RemoteQueryExecutor::isQueryPending() const bool RemoteQueryExecutor::hasThrownException() const { - return got_exception_from_replica || got_unknown_packet_from_replica; + return got_exception_from_replica || got_unknown_packet_from_replica || got_duplicated_part_uuids; } } diff --git a/src/DataStreams/RemoteQueryExecutor.h b/src/DataStreams/RemoteQueryExecutor.h index 46d9d067563..843cf75f1f8 100644 --- a/src/DataStreams/RemoteQueryExecutor.h +++ b/src/DataStreams/RemoteQueryExecutor.h @@ -57,6 +57,9 @@ public: /// Create connection and send query, external tables and scalars. void sendQuery(); + /// Query is resent to a replica, the query itself can be modified. + std::atomic resent_query { false }; + /// Read next block of data. Returns empty block if query is finished. Block read(); @@ -152,6 +155,14 @@ private: */ std::atomic got_unknown_packet_from_replica { false }; + /** Got duplicated uuids from replica + */ + std::atomic got_duplicated_part_uuids{ false }; + + /// Parts uuids, collected from remote replicas + std::mutex duplicated_part_uuids_mutex; + std::vector duplicated_part_uuids; + PoolMode pool_mode = PoolMode::GET_MANY; StorageID main_table = StorageID::createEmpty(); @@ -163,6 +174,10 @@ private: /// Send all temporary tables to remote servers void sendExternalTables(); + /** Set part uuids to a query context, collected from remote replicas. + */ + bool setPartUUIDs(const std::vector & uuids); + /// If wasn't sent yet, send request to cancel all connections to replicas void tryCancel(const char * reason, std::unique_ptr * read_context); @@ -174,6 +189,9 @@ private: /// Process packet for read and return data block if possible. std::optional processPacket(Packet packet); + + /// Reads packet by packet + Block readPackets(); }; } diff --git a/src/IO/WriteHelpers.h b/src/IO/WriteHelpers.h index 9072f306bd9..a37a5b5ddc6 100644 --- a/src/IO/WriteHelpers.h +++ b/src/IO/WriteHelpers.h @@ -910,6 +910,7 @@ inline void writeBinary(const StringRef & x, WriteBuffer & buf) { writeStringBin inline void writeBinary(const std::string_view & x, WriteBuffer & buf) { writeStringBinary(x, buf); } inline void writeBinary(const Int128 & x, WriteBuffer & buf) { writePODBinary(x, buf); } inline void writeBinary(const UInt128 & x, WriteBuffer & buf) { writePODBinary(x, buf); } +inline void writeBinary(const UUID & x, WriteBuffer & buf) { writePODBinary(x, buf); } inline void writeBinary(const DummyUInt256 & x, WriteBuffer & buf) { writePODBinary(x, buf); } inline void writeBinary(const Decimal32 & x, WriteBuffer & buf) { writePODBinary(x, buf); } inline void writeBinary(const Decimal64 & x, WriteBuffer & buf) { writePODBinary(x, buf); } diff --git a/src/Interpreters/Context.cpp b/src/Interpreters/Context.cpp index 8ff317764a7..9a1fcf6a067 100644 --- a/src/Interpreters/Context.cpp +++ b/src/Interpreters/Context.cpp @@ -64,6 +64,7 @@ #include #include #include +#include namespace ProfileEvents @@ -2510,4 +2511,22 @@ StorageID Context::resolveStorageIDImpl(StorageID storage_id, StorageNamespace w return StorageID::createEmpty(); } +PartUUIDsPtr Context::getPartUUIDs() +{ + auto lock = getLock(); + if (!part_uuids) + part_uuids = std::make_shared(); + + return part_uuids; +} + +PartUUIDsPtr Context::getIgnoredPartUUIDs() +{ + auto lock = getLock(); + if (!ignored_part_uuids) + ignored_part_uuids = std::make_shared(); + + return ignored_part_uuids; +} + } diff --git a/src/Interpreters/Context.h b/src/Interpreters/Context.h index 5801cc2b949..4dbdf390473 100644 --- a/src/Interpreters/Context.h +++ b/src/Interpreters/Context.h @@ -107,6 +107,8 @@ using StoragePolicyPtr = std::shared_ptr; using StoragePoliciesMap = std::map; class StoragePolicySelector; using StoragePolicySelectorPtr = std::shared_ptr; +struct PartUUIDs; +using PartUUIDsPtr = std::shared_ptr; class IOutputFormat; using OutputFormatPtr = std::shared_ptr; @@ -264,6 +266,9 @@ private: using SampleBlockCache = std::unordered_map; mutable SampleBlockCache sample_block_cache; + std::shared_ptr part_uuids; /// set of parts' uuids, is used for query parts deduplication + std::shared_ptr ignored_part_uuids; /// set of parts' uuids are meant to be excluded from query processing + NameToNameMap query_parameters; /// Dictionary with query parameters for prepared statements. /// (key=name, value) @@ -734,6 +739,9 @@ public: }; MySQLWireContext mysql; + + PartUUIDsPtr getPartUUIDs(); + PartUUIDsPtr getIgnoredPartUUIDs(); private: std::unique_lock getLock() const; diff --git a/src/Server/TCPHandler.cpp b/src/Server/TCPHandler.cpp index 12d1a0249b7..0d040652342 100644 --- a/src/Server/TCPHandler.cpp +++ b/src/Server/TCPHandler.cpp @@ -24,6 +24,7 @@ #include #include #include +#include #include #include #include @@ -180,10 +181,16 @@ void TCPHandler::runImpl() /** If Query - process it. If Ping or Cancel - go back to the beginning. * There may come settings for a separate query that modify `query_context`. + * It's possible to receive part uuids packet before the query, so then receivePacket has to be called twice. */ if (!receivePacket()) continue; + /** If part_uuids got received in previous packet, trying to read again. + */ + if (state.empty() && state.part_uuids && !receivePacket()) + continue; + query_scope.emplace(*query_context); send_exception_with_stack_trace = query_context->getSettingsRef().calculate_text_stack_trace; @@ -528,6 +535,8 @@ void TCPHandler::processOrdinaryQuery() /// Pull query execution result, if exists, and send it to network. if (state.io.in) { + sendPartUUIDs(); + /// This allows the client to prepare output format if (Block header = state.io.in->getHeader()) sendData(header); @@ -592,6 +601,8 @@ void TCPHandler::processOrdinaryQueryWithProcessors() { auto & pipeline = state.io.pipeline; + sendPartUUIDs(); + /// Send header-block, to allow client to prepare output format for data to send. { const auto & header = pipeline.getHeader(); @@ -693,6 +704,20 @@ void TCPHandler::receiveUnexpectedTablesStatusRequest() throw NetException("Unexpected packet TablesStatusRequest received from client", ErrorCodes::UNEXPECTED_PACKET_FROM_CLIENT); } +void TCPHandler::sendPartUUIDs() +{ + auto uuids = query_context->getPartUUIDs()->get(); + if (!uuids.empty()) + { + for (const auto & uuid : uuids) + LOG_TRACE(log, "Sending UUID: {}", toString(uuid)); + + writeVarUInt(Protocol::Server::PartUUIDs, *out); + writeVectorBinary(uuids, *out); + out->next(); + } +} + void TCPHandler::sendProfileInfo(const BlockStreamProfileInfo & info) { writeVarUInt(Protocol::Server::ProfileInfo, *out); @@ -905,6 +930,10 @@ bool TCPHandler::receivePacket() switch (packet_type) { + case Protocol::Client::IgnoredPartUUIDs: + /// Part uuids packet if any comes before query. + receiveIgnoredPartUUIDs(); + return true; case Protocol::Client::Query: if (!state.empty()) receiveUnexpectedQuery(); @@ -940,6 +969,16 @@ bool TCPHandler::receivePacket() } } +void TCPHandler::receiveIgnoredPartUUIDs() +{ + state.part_uuids = true; + std::vector uuids; + readVectorBinary(uuids, *in); + + if (!uuids.empty()) + query_context->getIgnoredPartUUIDs()->add(uuids); +} + void TCPHandler::receiveClusterNameAndSalt() { readStringBinary(cluster, *in); diff --git a/src/Server/TCPHandler.h b/src/Server/TCPHandler.h index 0d3109a6591..41539bef1e1 100644 --- a/src/Server/TCPHandler.h +++ b/src/Server/TCPHandler.h @@ -67,6 +67,9 @@ struct QueryState /// Temporary tables read bool temporary_tables_read = false; + /// A state got uuids to exclude from a query + bool part_uuids = false; + /// Request requires data from client for function input() bool need_receive_data_for_input = false; /// temporary place for incoming data block for input() @@ -173,6 +176,7 @@ private: void receiveHello(); bool receivePacket(); void receiveQuery(); + void receiveIgnoredPartUUIDs(); bool receiveData(bool scalar); bool readDataNext(const size_t & poll_interval, const int & receive_timeout); void readData(const Settings & connection_settings); @@ -201,6 +205,7 @@ private: void sendProgress(); void sendLogs(); void sendEndOfStream(); + void sendPartUUIDs(); void sendProfileInfo(const BlockStreamProfileInfo & info); void sendTotals(const Block & totals); void sendExtremes(const Block & extremes); diff --git a/src/Storages/MergeTree/MergeTreeBaseSelectProcessor.cpp b/src/Storages/MergeTree/MergeTreeBaseSelectProcessor.cpp index c852151f27d..ce60856505e 100644 --- a/src/Storages/MergeTree/MergeTreeBaseSelectProcessor.cpp +++ b/src/Storages/MergeTree/MergeTreeBaseSelectProcessor.cpp @@ -7,6 +7,7 @@ #include #include #include +#include namespace DB @@ -205,6 +206,7 @@ namespace virtual void insertStringColumn(const ColumnPtr & column, const String & name) = 0; virtual void insertUInt64Column(const ColumnPtr & column, const String & name) = 0; + virtual void insertUUIDColumn(const ColumnPtr & column, const String & name) = 0; }; } @@ -241,6 +243,16 @@ static void injectVirtualColumnsImpl(size_t rows, VirtualColumnsInserter & inser inserter.insertUInt64Column(column, virtual_column_name); } + else if (virtual_column_name == "_part_uuid") + { + ColumnPtr column; + if (rows) + column = DataTypeUUID().createColumnConst(rows, task->data_part->uuid)->convertToFullColumnIfConst(); + else + column = DataTypeUUID().createColumn(); + + inserter.insertUUIDColumn(column, virtual_column_name); + } else if (virtual_column_name == "_partition_id") { ColumnPtr column; @@ -271,6 +283,11 @@ namespace block.insert({column, std::make_shared(), name}); } + void insertUUIDColumn(const ColumnPtr & column, const String & name) final + { + block.insert({column, std::make_shared(), name}); + } + Block & block; }; @@ -288,6 +305,10 @@ namespace columns.push_back(column); } + void insertUUIDColumn(const ColumnPtr & column, const String &) final + { + columns.push_back(column); + } Columns & columns; }; } diff --git a/src/Storages/MergeTree/MergeTreeData.cpp b/src/Storages/MergeTree/MergeTreeData.cpp index 9ed751cbc8e..56e6033d18e 100644 --- a/src/Storages/MergeTree/MergeTreeData.cpp +++ b/src/Storages/MergeTree/MergeTreeData.cpp @@ -4,6 +4,7 @@ #include #include #include +#include #include #include #include @@ -3949,6 +3950,7 @@ NamesAndTypesList MergeTreeData::getVirtuals() const return NamesAndTypesList{ NameAndTypePair("_part", std::make_shared()), NameAndTypePair("_part_index", std::make_shared()), + NameAndTypePair("_part_uuid", std::make_shared()), NameAndTypePair("_partition_id", std::make_shared()), NameAndTypePair("_sample_factor", std::make_shared()), }; diff --git a/src/Storages/MergeTree/MergeTreeDataPartUUID.cpp b/src/Storages/MergeTree/MergeTreeDataPartUUID.cpp new file mode 100644 index 00000000000..17d19855798 --- /dev/null +++ b/src/Storages/MergeTree/MergeTreeDataPartUUID.cpp @@ -0,0 +1,38 @@ +#include + +namespace DB +{ + +std::vector PartUUIDs::add(const std::vector & new_uuids) +{ + std::lock_guard lock(mutex); + std::vector intersection; + + /// First check any presence of uuids in a uuids, return duplicates back if any + for (const auto & uuid : new_uuids) + { + if (uuids.find(uuid) != uuids.end()) + intersection.emplace_back(uuid); + } + + if (intersection.empty()) + { + for (const auto & uuid : new_uuids) + uuids.emplace(uuid); + } + return intersection; +} + +std::vector PartUUIDs::get() const +{ + std::lock_guard lock(mutex); + return std::vector(uuids.begin(), uuids.end()); +} + +bool PartUUIDs::has(const UUID & uuid) const +{ + std::lock_guard lock(mutex); + return uuids.find(uuid) != uuids.end(); +} + +} diff --git a/src/Storages/MergeTree/MergeTreeDataPartUUID.h b/src/Storages/MergeTree/MergeTreeDataPartUUID.h new file mode 100644 index 00000000000..ee3a9ee2791 --- /dev/null +++ b/src/Storages/MergeTree/MergeTreeDataPartUUID.h @@ -0,0 +1,34 @@ +#pragma once + +#include +#include +#include +#include + +namespace DB +{ + +/** PartUUIDs is a uuid set to control query deduplication. + * The object is used in query context in both direction: + * Server->Client to send all parts' UUIDs that have been read during the query + * Client->Server to ignored specified parts from being processed. + * + * Current implementation assumes a user setting allow_experimental_query_deduplication=1 is set. + */ +struct PartUUIDs +{ +public: + /// Add new UUIDs if not duplicates found otherwise return duplicated UUIDs + std::vector add(const std::vector & uuids); + /// Get accumulated UUIDs + std::vector get() const; + bool has(const UUID & uuid) const; + +private: + mutable std::mutex mutex; + std::unordered_set uuids; +}; + +using PartUUIDsPtr = std::shared_ptr; + +} diff --git a/src/Storages/MergeTree/MergeTreeDataSelectExecutor.cpp b/src/Storages/MergeTree/MergeTreeDataSelectExecutor.cpp index 457c9c04aa9..740288e3b46 100644 --- a/src/Storages/MergeTree/MergeTreeDataSelectExecutor.cpp +++ b/src/Storages/MergeTree/MergeTreeDataSelectExecutor.cpp @@ -15,6 +15,7 @@ #include #include #include +#include #include #include #include @@ -35,8 +36,10 @@ #include #include +#include #include #include +#include #include #include @@ -61,6 +64,7 @@ namespace ErrorCodes extern const int TOO_MANY_ROWS; extern const int CANNOT_PARSE_TEXT; extern const int TOO_MANY_PARTITIONS; + extern const int DUPLICATED_PART_UUIDS; } @@ -71,14 +75,27 @@ MergeTreeDataSelectExecutor::MergeTreeDataSelectExecutor(const MergeTreeData & d /// Construct a block consisting only of possible values of virtual columns -static Block getBlockWithPartColumn(const MergeTreeData::DataPartsVector & parts) +static Block getBlockWithVirtualPartColumns(const MergeTreeData::DataPartsVector & parts, bool with_uuid) { - auto column = ColumnString::create(); + auto part_column = ColumnString::create(); + auto part_uuid_column = ColumnUUID::create(); for (const auto & part : parts) - column->insert(part->name); + { + part_column->insert(part->name); + if (with_uuid) + part_uuid_column->insert(part->uuid); + } - return Block{ColumnWithTypeAndName(std::move(column), std::make_shared(), "_part")}; + if (with_uuid) + { + return Block(std::initializer_list{ + ColumnWithTypeAndName(std::move(part_column), std::make_shared(), "_part"), + ColumnWithTypeAndName(std::move(part_uuid_column), std::make_shared(), "_part_uuid"), + }); + } + + return Block{ColumnWithTypeAndName(std::move(part_column), std::make_shared(), "_part")}; } @@ -162,6 +179,7 @@ QueryPlanPtr MergeTreeDataSelectExecutor::readFromParts( Names real_column_names; bool part_column_queried = false; + bool part_uuid_column_queried = false; bool sample_factor_column_queried = false; Float64 used_sample_factor = 1; @@ -181,6 +199,11 @@ QueryPlanPtr MergeTreeDataSelectExecutor::readFromParts( { virt_column_names.push_back(name); } + else if (name == "_part_uuid") + { + part_uuid_column_queried = true; + virt_column_names.push_back(name); + } else if (name == "_sample_factor") { sample_factor_column_queried = true; @@ -198,9 +221,9 @@ QueryPlanPtr MergeTreeDataSelectExecutor::readFromParts( if (real_column_names.empty()) real_column_names.push_back(ExpressionActions::getSmallestColumn(available_real_columns)); - /// If `_part` virtual column is requested, we try to use it as an index. - Block virtual_columns_block = getBlockWithPartColumn(parts); - if (part_column_queried) + /// If `_part` or `_part_uuid` virtual columns are requested, we try to filter out data by them. + Block virtual_columns_block = getBlockWithVirtualPartColumns(parts, part_uuid_column_queried); + if (part_column_queried || part_uuid_column_queried) VirtualColumnUtils::filterBlockWithQuery(query_info.query, virtual_columns_block, context); auto part_values = VirtualColumnUtils::extractSingleValueFromBlock(virtual_columns_block, "_part"); @@ -246,36 +269,88 @@ QueryPlanPtr MergeTreeDataSelectExecutor::readFromParts( /// Select the parts in which there can be data that satisfy `minmax_idx_condition` and that match the condition on `_part`, /// as well as `max_block_number_to_read`. + /// Skip parts uuids if any to the query context, or skip parts which uuids marked as excluded. { - auto prev_parts = parts; - parts.clear(); + Context & query_context + = context.hasQueryContext() ? const_cast(context).getQueryContext() : const_cast(context); - for (const auto & part : prev_parts) + /// process_parts prepare parts that have to be read for the query, + /// returns false if duplicated parts' UUID have been met + auto select_parts = [&] (MergeTreeData::DataPartsVector & selected_parts) -> bool { - if (part_values.find(part->name) == part_values.end()) - continue; + auto ignored_part_uuids = query_context.getIgnoredPartUUIDs(); + std::unordered_set temp_part_uuids; - if (part->isEmpty()) - continue; + auto prev_parts = selected_parts; + selected_parts.clear(); - if (minmax_idx_condition && !minmax_idx_condition->checkInHyperrectangle( - part->minmax_idx.hyperrectangle, data.minmax_idx_column_types).can_be_true) - continue; - - if (partition_pruner) + for (const auto & part : prev_parts) { - if (partition_pruner->canBePruned(part)) + if (part_values.find(part->name) == part_values.end()) continue; + + if (part->isEmpty()) + continue; + + if (minmax_idx_condition + && !minmax_idx_condition->checkInHyperrectangle(part->minmax_idx.hyperrectangle, data.minmax_idx_column_types) + .can_be_true) + continue; + + if (partition_pruner) + { + if (partition_pruner->canBePruned(part)) + continue; + } + + if (max_block_numbers_to_read) + { + auto blocks_iterator = max_block_numbers_to_read->find(part->info.partition_id); + if (blocks_iterator == max_block_numbers_to_read->end() || part->info.max_block > blocks_iterator->second) + continue; + } + + /// populate UUIDs and exclude ignored parts if enabled + if (query_context.getSettingsRef().allow_experimental_query_deduplication && part->uuid != UUIDHelpers::Nil) + { + /// Skip the part if its uuid is meant to be excluded + if (ignored_part_uuids->has(part->uuid)) + continue; + + auto result = temp_part_uuids.insert(part->uuid); + if (!result.second) + throw Exception("Found a part with the same UUID on the same replica.", ErrorCodes::LOGICAL_ERROR); + } + + selected_parts.push_back(part); } - if (max_block_numbers_to_read) + if (!temp_part_uuids.empty()) { - auto blocks_iterator = max_block_numbers_to_read->find(part->info.partition_id); - if (blocks_iterator == max_block_numbers_to_read->end() || part->info.max_block > blocks_iterator->second) - continue; + auto duplicates = query_context.getPartUUIDs()->add(std::vector{temp_part_uuids.begin(), temp_part_uuids.end()}); + if (!duplicates.empty()) + { + /// on a local replica with prefer_localhost_replica=1 if any duplicates appeared during the first pass, + /// adding them to the exclusion, so they will be skipped on second pass + query_context.getIgnoredPartUUIDs()->add(duplicates); + return false; + } } - parts.push_back(part); + return true; + }; + + /// Process parts that have to be read for a query. + auto needs_retry = !select_parts(parts); + /// If any duplicated part UUIDs met during the first step, try to ignore them in second pass + if (needs_retry) + { + if (log) + LOG_DEBUG(log, "Found duplicate uuids locally, will retry part selection without them"); + + /// Second attempt didn't help, throw an exception + if (!select_parts(parts)) + throw Exception("Found duplicate UUIDs while processing query.", ErrorCodes::DUPLICATED_PART_UUIDS); } } diff --git a/src/Storages/StorageDistributed.cpp b/src/Storages/StorageDistributed.cpp index 5227cd8a33e..570aeef820d 100644 --- a/src/Storages/StorageDistributed.cpp +++ b/src/Storages/StorageDistributed.cpp @@ -4,6 +4,7 @@ #include #include +#include #include #include @@ -345,6 +346,7 @@ NamesAndTypesList StorageDistributed::getVirtuals() const NameAndTypePair("_table", std::make_shared()), NameAndTypePair("_part", std::make_shared()), NameAndTypePair("_part_index", std::make_shared()), + NameAndTypePair("_part_uuid", std::make_shared()), NameAndTypePair("_partition_id", std::make_shared()), NameAndTypePair("_sample_factor", std::make_shared()), NameAndTypePair("_shard_num", std::make_shared()), diff --git a/src/Storages/ya.make b/src/Storages/ya.make index 69e319cbad5..dbf37e58695 100644 --- a/src/Storages/ya.make +++ b/src/Storages/ya.make @@ -48,6 +48,7 @@ SRCS( MergeTree/MergeTreeDataPartInMemory.cpp MergeTree/MergeTreeDataPartTTLInfo.cpp MergeTree/MergeTreeDataPartType.cpp + MergeTree/MergeTreeDataPartUUID.cpp MergeTree/MergeTreeDataPartWide.cpp MergeTree/MergeTreeDataPartWriterCompact.cpp MergeTree/MergeTreeDataPartWriterInMemory.cpp diff --git a/tests/integration/test_query_deduplication/__init__.py b/tests/integration/test_query_deduplication/__init__.py new file mode 100644 index 00000000000..e69de29bb2d diff --git a/tests/integration/test_query_deduplication/configs/deduplication_settings.xml b/tests/integration/test_query_deduplication/configs/deduplication_settings.xml new file mode 100644 index 00000000000..8369c916848 --- /dev/null +++ b/tests/integration/test_query_deduplication/configs/deduplication_settings.xml @@ -0,0 +1,5 @@ + + + 1 + + diff --git a/tests/integration/test_query_deduplication/configs/remote_servers.xml b/tests/integration/test_query_deduplication/configs/remote_servers.xml new file mode 100644 index 00000000000..f12558ca529 --- /dev/null +++ b/tests/integration/test_query_deduplication/configs/remote_servers.xml @@ -0,0 +1,24 @@ + + + + + + node1 + 9000 + + + + + node2 + 9000 + + + + + node3 + 9000 + + + + + diff --git a/tests/integration/test_query_deduplication/test.py b/tests/integration/test_query_deduplication/test.py new file mode 100644 index 00000000000..8d935b98579 --- /dev/null +++ b/tests/integration/test_query_deduplication/test.py @@ -0,0 +1,165 @@ +import uuid + +import pytest + +from helpers.cluster import ClickHouseCluster +from helpers.test_tools import TSV + +DUPLICATED_UUID = uuid.uuid4() + +cluster = ClickHouseCluster(__file__) + +node1 = cluster.add_instance( + 'node1', + main_configs=['configs/remote_servers.xml', 'configs/deduplication_settings.xml']) + +node2 = cluster.add_instance( + 'node2', + main_configs=['configs/remote_servers.xml', 'configs/deduplication_settings.xml']) + +node3 = cluster.add_instance( + 'node3', + main_configs=['configs/remote_servers.xml', 'configs/deduplication_settings.xml']) + + +@pytest.fixture(scope="module") +def started_cluster(): + try: + cluster.start() + yield cluster + finally: + cluster.shutdown() + + +def prepare_node(node, parts_uuid=None): + node.query(""" + CREATE TABLE t(_prefix UInt8 DEFAULT 0, key UInt64, value UInt64) + ENGINE MergeTree() + ORDER BY tuple() + PARTITION BY _prefix + SETTINGS index_granularity = 1 + """) + + node.query(""" + CREATE TABLE d AS t ENGINE=Distributed(test_cluster, default, t) + """) + + # Stop merges while populating test data + node.query("SYSTEM STOP MERGES") + + # Create 5 parts + for i in range(1, 6): + node.query("INSERT INTO t VALUES ({}, {}, {})".format(i, i, i)) + + node.query("DETACH TABLE t") + + if parts_uuid: + for part, part_uuid in parts_uuid: + script = """ + echo -n '{}' > /var/lib/clickhouse/data/default/t/{}/uuid.txt + """.format(part_uuid, part) + node.exec_in_container(["bash", "-c", script]) + + # Attach table back + node.query("ATTACH TABLE t") + + # NOTE: + # due to absence of the ability to lock part, need to operate on parts with preventin merges + # node.query("SYSTEM START MERGES") + # node.query("OPTIMIZE TABLE t FINAL") + + print(node.name) + print(node.query("SELECT name, uuid, partition FROM system.parts WHERE table = 't' AND active ORDER BY name")) + + assert '5' == node.query("SELECT count() FROM system.parts WHERE table = 't' AND active").strip() + if parts_uuid: + for part, part_uuid in parts_uuid: + assert '1' == node.query( + "SELECT count() FROM system.parts WHERE table = 't' AND uuid = '{}' AND active".format( + part_uuid)).strip() + + +@pytest.fixture(scope="module") +def prepared_cluster(started_cluster): + print("duplicated UUID: {}".format(DUPLICATED_UUID)) + prepare_node(node1, parts_uuid=[("3_3_3_0", DUPLICATED_UUID)]) + prepare_node(node2, parts_uuid=[("3_3_3_0", DUPLICATED_UUID)]) + prepare_node(node3) + + +def test_virtual_column(prepared_cluster): + # Part containing `key=3` has the same fingerprint on both nodes, + # we expect it to be included only once in the end result.; + # select query is using virtucal column _part_fingerprint to filter out part in one shard + expected = """ + 1 2 + 2 2 + 3 1 + 4 2 + 5 2 + """ + assert TSV(expected) == TSV(node1.query(""" + SELECT + key, + count() AS c + FROM d + WHERE ((_shard_num = 1) AND (_part_uuid != '{}')) OR (_shard_num = 2) + GROUP BY key + ORDER BY + key ASC + """.format(DUPLICATED_UUID))) + + +def test_with_deduplication(prepared_cluster): + # Part containing `key=3` has the same fingerprint on both nodes, + # we expect it to be included only once in the end result + expected = """ +1 3 +2 3 +3 2 +4 3 +5 3 +""" + assert TSV(expected) == TSV(node1.query( + "SET allow_experimental_query_deduplication=1; SELECT key, count() c FROM d GROUP BY key ORDER BY key")) + + +def test_no_merge_with_deduplication(prepared_cluster): + # Part containing `key=3` has the same fingerprint on both nodes, + # we expect it to be included only once in the end result. + # even with distributed_group_by_no_merge=1 the duplicated part should be excluded from the final result + expected = """ +1 1 +2 1 +3 1 +4 1 +5 1 +1 1 +2 1 +3 1 +4 1 +5 1 +1 1 +2 1 +4 1 +5 1 +""" + assert TSV(expected) == TSV(node1.query("SELECT key, count() c FROM d GROUP BY key ORDER BY key", settings={ + "allow_experimental_query_deduplication": 1, + "distributed_group_by_no_merge": 1, + })) + + +def test_without_deduplication(prepared_cluster): + # Part containing `key=3` has the same fingerprint on both nodes, + # but allow_experimental_query_deduplication is disabled, + # so it will not be excluded + expected = """ +1 3 +2 3 +3 3 +4 3 +5 3 +""" + assert TSV(expected) == TSV(node1.query( + "SET allow_experimental_query_deduplication=0; SELECT key, count() c FROM d GROUP BY key ORDER BY key"))