Merge pull request #17348 from xjewer/alex/CLICKHOUSE-606_deduplication_UUID

CLICKHOUSE-606: query deduplication based on parts' UUID
This commit is contained in:
alesapin 2021-02-05 22:47:34 +03:00 committed by GitHub
commit 011109c82a
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
28 changed files with 678 additions and 43 deletions

View File

@ -1900,6 +1900,9 @@ private:
switch (packet.type)
{
case Protocol::Server::PartUUIDs:
return true;
case Protocol::Server::Data:
if (!cancelled)
onData(packet.block);

View File

@ -542,6 +542,12 @@ void Connection::sendData(const Block & block, const String & name, bool scalar)
throttler->add(out->count() - prev_bytes);
}
void Connection::sendIgnoredPartUUIDs(const std::vector<UUID> & uuids)
{
writeVarUInt(Protocol::Client::IgnoredPartUUIDs, *out);
writeVectorBinary(uuids, *out);
out->next();
}
void Connection::sendPreparedData(ReadBuffer & input, size_t size, const String & name)
{
@ -798,6 +804,10 @@ Packet Connection::receivePacket(std::function<void(Poco::Net::Socket &)> async_
case Protocol::Server::EndOfStream:
return res;
case Protocol::Server::PartUUIDs:
readVectorBinary(res.part_uuids, *in);
return res;
default:
/// In unknown state, disconnect - to not leave unsynchronised connection.
disconnect();

View File

@ -66,6 +66,7 @@ struct Packet
std::vector<String> multistring_message;
Progress progress;
BlockStreamProfileInfo profile_info;
std::vector<UUID> part_uuids;
Packet() : type(Protocol::Server::Hello) {}
};
@ -157,6 +158,8 @@ public:
void sendScalarsData(Scalars & data);
/// Send all contents of external (temporary) tables.
void sendExternalTablesData(ExternalTablesData & data);
/// Send parts' uuids to excluded them from query processing
void sendIgnoredPartUUIDs(const std::vector<UUID> & uuids);
/// Send prepared block of data (serialized and, if need, compressed), that will be read from 'input'.
/// You could pass size of serialized/compressed block.

View File

@ -140,6 +140,21 @@ void MultiplexedConnections::sendQuery(
sent_query = true;
}
void MultiplexedConnections::sendIgnoredPartUUIDs(const std::vector<UUID> & uuids)
{
std::lock_guard lock(cancel_mutex);
if (sent_query)
throw Exception("Cannot send uuids after query is sent.", ErrorCodes::LOGICAL_ERROR);
for (ReplicaState & state : replica_states)
{
Connection * connection = state.connection;
if (connection != nullptr)
connection->sendIgnoredPartUUIDs(uuids);
}
}
Packet MultiplexedConnections::receivePacket()
{
std::lock_guard lock(cancel_mutex);
@ -195,6 +210,7 @@ Packet MultiplexedConnections::drain()
switch (packet.type)
{
case Protocol::Server::PartUUIDs:
case Protocol::Server::Data:
case Protocol::Server::Progress:
case Protocol::Server::ProfileInfo:
@ -253,6 +269,7 @@ Packet MultiplexedConnections::receivePacketUnlocked(std::function<void(Poco::Ne
switch (packet.type)
{
case Protocol::Server::PartUUIDs:
case Protocol::Server::Data:
case Protocol::Server::Progress:
case Protocol::Server::ProfileInfo:

View File

@ -50,6 +50,9 @@ public:
/// Send a request to the replica to cancel the request
void sendCancel();
/// Send parts' uuids to replicas to exclude them from query processing
void sendIgnoredPartUUIDs(const std::vector<UUID> & uuids);
/** On each replica, read and skip all packets to EndOfStream or Exception.
* Returns EndOfStream if no exception has been received. Otherwise
* returns the last received packet of type Exception.

View File

@ -26,4 +26,6 @@ using ColumnInt256 = ColumnVector<Int256>;
using ColumnFloat32 = ColumnVector<Float32>;
using ColumnFloat64 = ColumnVector<Float64>;
using ColumnUUID = ColumnVector<UInt128>;
}

View File

@ -533,6 +533,7 @@
M(564, INTERSERVER_SCHEME_DOESNT_MATCH) \
M(565, TOO_MANY_PARTITIONS) \
M(566, CANNOT_RMDIR) \
M(567, DUPLICATED_PART_UUIDS) \
\
M(999, KEEPER_EXCEPTION) \
M(1000, POCO_EXCEPTION) \

View File

@ -75,8 +75,9 @@ namespace Protocol
TablesStatusResponse = 9, /// A response to TablesStatus request.
Log = 10, /// System logs of the query execution
TableColumns = 11, /// Columns' description for default values calculation
PartUUIDs = 12, /// List of unique parts ids.
MAX = TableColumns,
MAX = PartUUIDs,
};
/// NOTE: If the type of packet argument would be Enum, the comparison packet >= 0 && packet < 10
@ -98,6 +99,7 @@ namespace Protocol
"TablesStatusResponse",
"Log",
"TableColumns",
"PartUUIDs",
};
return packet <= MAX
? data[packet]
@ -132,8 +134,9 @@ namespace Protocol
TablesStatusRequest = 5, /// Check status of tables on the server.
KeepAlive = 6, /// Keep the connection alive
Scalar = 7, /// A block of data (compressed or not).
IgnoredPartUUIDs = 8, /// List of unique parts ids to exclude from query processing
MAX = Scalar,
MAX = IgnoredPartUUIDs,
};
inline const char * toString(UInt64 packet)
@ -147,6 +150,7 @@ namespace Protocol
"TablesStatusRequest",
"KeepAlive",
"Scalar",
"IgnoredPartUUIDs",
};
return packet <= MAX
? data[packet]

View File

@ -420,6 +420,8 @@ class IColumn;
M(Bool, async_socket_for_remote, true, "Asynchronously read from socket executing remote query", 0) \
\
M(Bool, optimize_rewrite_sum_if_to_count_if, true, "Rewrite sumIf() and sum(if()) function countIf() function when logically equivalent", 0) \
M(Bool, allow_experimental_query_deduplication, false, "Allow sending parts' UUIDs for a query in order to deduplicate data parts if any", 0) \
\
/** Obsolete settings that do nothing but left for compatibility reasons. Remove each one after half a year of obsolescence. */ \
\
M(UInt64, max_memory_usage_for_all_queries, 0, "Obsolete. Will be removed after 2020-10-20", 0) \

View File

@ -13,6 +13,7 @@
#include <Interpreters/InternalTextLogsQueue.h>
#include <IO/ConnectionTimeoutsContext.h>
#include <Common/FiberStack.h>
#include <Storages/MergeTree/MergeTreeDataPartUUID.h>
namespace DB
{
@ -20,6 +21,7 @@ namespace DB
namespace ErrorCodes
{
extern const int UNKNOWN_PACKET_FROM_SERVER;
extern const int DUPLICATED_PART_UUIDS;
}
RemoteQueryExecutor::RemoteQueryExecutor(
@ -158,6 +160,7 @@ void RemoteQueryExecutor::sendQuery()
std::lock_guard guard(was_cancelled_mutex);
established = true;
was_cancelled = false;
auto timeouts = ConnectionTimeouts::getTCPTimeoutsWithFailover(settings);
ClientInfo modified_client_info = context.getClientInfo();
@ -167,6 +170,12 @@ void RemoteQueryExecutor::sendQuery()
modified_client_info.client_trace_context = CurrentThread::get().thread_trace_context;
}
{
std::lock_guard lock(duplicated_part_uuids_mutex);
if (!duplicated_part_uuids.empty())
multiplexed_connections->sendIgnoredPartUUIDs(duplicated_part_uuids);
}
multiplexed_connections->sendQuery(timeouts, query, query_id, stage, modified_client_info, true);
established = false;
@ -196,6 +205,8 @@ Block RemoteQueryExecutor::read()
if (auto block = processPacket(std::move(packet)))
return *block;
else if (got_duplicated_part_uuids)
return std::get<Block>(restartQueryWithoutDuplicatedUUIDs());
}
}
@ -211,7 +222,7 @@ std::variant<Block, int> RemoteQueryExecutor::read(std::unique_ptr<ReadContext>
return Block();
}
if (!read_context)
if (!read_context || resent_query)
{
std::lock_guard lock(was_cancelled_mutex);
if (was_cancelled)
@ -234,6 +245,8 @@ std::variant<Block, int> RemoteQueryExecutor::read(std::unique_ptr<ReadContext>
{
if (auto data = processPacket(std::move(read_context->packet)))
return std::move(*data);
else if (got_duplicated_part_uuids)
return restartQueryWithoutDuplicatedUUIDs(&read_context);
}
}
while (true);
@ -242,10 +255,39 @@ std::variant<Block, int> RemoteQueryExecutor::read(std::unique_ptr<ReadContext>
#endif
}
std::variant<Block, int> RemoteQueryExecutor::restartQueryWithoutDuplicatedUUIDs(std::unique_ptr<ReadContext> * read_context)
{
/// Cancel previous query and disconnect before retry.
cancel(read_context);
multiplexed_connections->disconnect();
/// Only resend once, otherwise throw an exception
if (!resent_query)
{
if (log)
LOG_DEBUG(log, "Found duplicate UUIDs, will retry query without those parts");
resent_query = true;
sent_query = false;
got_duplicated_part_uuids = false;
/// Consecutive read will implicitly send query first.
if (!read_context)
return read();
else
return read(*read_context);
}
throw Exception("Found duplicate uuids while processing query.", ErrorCodes::DUPLICATED_PART_UUIDS);
}
std::optional<Block> RemoteQueryExecutor::processPacket(Packet packet)
{
switch (packet.type)
{
case Protocol::Server::PartUUIDs:
if (!setPartUUIDs(packet.part_uuids))
got_duplicated_part_uuids = true;
break;
case Protocol::Server::Data:
/// If the block is not empty and is not a header block
if (packet.block && (packet.block.rows() > 0))
@ -306,6 +348,20 @@ std::optional<Block> RemoteQueryExecutor::processPacket(Packet packet)
return {};
}
bool RemoteQueryExecutor::setPartUUIDs(const std::vector<UUID> & uuids)
{
Context & query_context = const_cast<Context &>(context).getQueryContext();
auto duplicates = query_context.getPartUUIDs()->add(uuids);
if (!duplicates.empty())
{
std::lock_guard lock(duplicated_part_uuids_mutex);
duplicated_part_uuids.insert(duplicated_part_uuids.begin(), duplicates.begin(), duplicates.end());
return false;
}
return true;
}
void RemoteQueryExecutor::finish(std::unique_ptr<ReadContext> * read_context)
{
/** If one of:
@ -383,6 +439,7 @@ void RemoteQueryExecutor::sendExternalTables()
{
std::lock_guard lock(external_tables_mutex);
external_tables_data.clear();
external_tables_data.reserve(count);
for (size_t i = 0; i < count; ++i)

View File

@ -57,6 +57,9 @@ public:
/// Create connection and send query, external tables and scalars.
void sendQuery();
/// Query is resent to a replica, the query itself can be modified.
std::atomic<bool> resent_query { false };
/// Read next block of data. Returns empty block if query is finished.
Block read();
@ -152,6 +155,14 @@ private:
*/
std::atomic<bool> got_unknown_packet_from_replica { false };
/** Got duplicated uuids from replica
*/
std::atomic<bool> got_duplicated_part_uuids{ false };
/// Parts uuids, collected from remote replicas
std::mutex duplicated_part_uuids_mutex;
std::vector<UUID> duplicated_part_uuids;
PoolMode pool_mode = PoolMode::GET_MANY;
StorageID main_table = StorageID::createEmpty();
@ -163,6 +174,14 @@ private:
/// Send all temporary tables to remote servers
void sendExternalTables();
/// Set part uuids to a query context, collected from remote replicas.
/// Return true if duplicates found.
bool setPartUUIDs(const std::vector<UUID> & uuids);
/// Cancell query and restart it with info about duplicated UUIDs
/// only for `allow_experimental_query_deduplication`.
std::variant<Block, int> restartQueryWithoutDuplicatedUUIDs(std::unique_ptr<ReadContext> * read_context = nullptr);
/// If wasn't sent yet, send request to cancel all connections to replicas
void tryCancel(const char * reason, std::unique_ptr<ReadContext> * read_context);
@ -174,6 +193,10 @@ private:
/// Process packet for read and return data block if possible.
std::optional<Block> processPacket(Packet packet);
/// Reads packet by packet
Block readPackets();
};
}

View File

@ -910,6 +910,7 @@ inline void writeBinary(const StringRef & x, WriteBuffer & buf) { writeStringBin
inline void writeBinary(const std::string_view & x, WriteBuffer & buf) { writeStringBinary(x, buf); }
inline void writeBinary(const Int128 & x, WriteBuffer & buf) { writePODBinary(x, buf); }
inline void writeBinary(const UInt128 & x, WriteBuffer & buf) { writePODBinary(x, buf); }
inline void writeBinary(const UUID & x, WriteBuffer & buf) { writePODBinary(x, buf); }
inline void writeBinary(const DummyUInt256 & x, WriteBuffer & buf) { writePODBinary(x, buf); }
inline void writeBinary(const Decimal32 & x, WriteBuffer & buf) { writePODBinary(x, buf); }
inline void writeBinary(const Decimal64 & x, WriteBuffer & buf) { writePODBinary(x, buf); }

View File

@ -64,6 +64,7 @@
#include <Common/RemoteHostFilter.h>
#include <Interpreters/DatabaseCatalog.h>
#include <Storages/MergeTree/BackgroundJobsExecutor.h>
#include <Storages/MergeTree/MergeTreeDataPartUUID.h>
namespace ProfileEvents
@ -2510,4 +2511,22 @@ StorageID Context::resolveStorageIDImpl(StorageID storage_id, StorageNamespace w
return StorageID::createEmpty();
}
PartUUIDsPtr Context::getPartUUIDs()
{
auto lock = getLock();
if (!part_uuids)
part_uuids = std::make_shared<PartUUIDs>();
return part_uuids;
}
PartUUIDsPtr Context::getIgnoredPartUUIDs()
{
auto lock = getLock();
if (!ignored_part_uuids)
ignored_part_uuids = std::make_shared<PartUUIDs>();
return ignored_part_uuids;
}
}

View File

@ -107,6 +107,8 @@ using StoragePolicyPtr = std::shared_ptr<const IStoragePolicy>;
using StoragePoliciesMap = std::map<String, StoragePolicyPtr>;
class StoragePolicySelector;
using StoragePolicySelectorPtr = std::shared_ptr<const StoragePolicySelector>;
struct PartUUIDs;
using PartUUIDsPtr = std::shared_ptr<PartUUIDs>;
class IOutputFormat;
using OutputFormatPtr = std::shared_ptr<IOutputFormat>;
@ -264,6 +266,9 @@ private:
using SampleBlockCache = std::unordered_map<std::string, Block>;
mutable SampleBlockCache sample_block_cache;
PartUUIDsPtr part_uuids; /// set of parts' uuids, is used for query parts deduplication
PartUUIDsPtr ignored_part_uuids; /// set of parts' uuids are meant to be excluded from query processing
NameToNameMap query_parameters; /// Dictionary with query parameters for prepared statements.
/// (key=name, value)
@ -734,6 +739,9 @@ public:
};
MySQLWireContext mysql;
PartUUIDsPtr getPartUUIDs();
PartUUIDsPtr getIgnoredPartUUIDs();
private:
std::unique_lock<std::recursive_mutex> getLock() const;

View File

@ -24,6 +24,7 @@
#include <Interpreters/OpenTelemetrySpanLog.h>
#include <Storages/StorageMemory.h>
#include <Storages/StorageReplicatedMergeTree.h>
#include <Storages/MergeTree/MergeTreeDataPartUUID.h>
#include <Core/ExternalTable.h>
#include <Storages/ColumnDefault.h>
#include <DataTypes/DataTypeLowCardinality.h>
@ -180,10 +181,16 @@ void TCPHandler::runImpl()
/** If Query - process it. If Ping or Cancel - go back to the beginning.
* There may come settings for a separate query that modify `query_context`.
* It's possible to receive part uuids packet before the query, so then receivePacket has to be called twice.
*/
if (!receivePacket())
continue;
/** If part_uuids got received in previous packet, trying to read again.
*/
if (state.empty() && state.part_uuids && !receivePacket())
continue;
query_scope.emplace(*query_context);
send_exception_with_stack_trace = query_context->getSettingsRef().calculate_text_stack_trace;
@ -528,6 +535,10 @@ void TCPHandler::processOrdinaryQuery()
/// Pull query execution result, if exists, and send it to network.
if (state.io.in)
{
if (query_context->getSettingsRef().allow_experimental_query_deduplication)
sendPartUUIDs();
/// This allows the client to prepare output format
if (Block header = state.io.in->getHeader())
sendData(header);
@ -592,6 +603,9 @@ void TCPHandler::processOrdinaryQueryWithProcessors()
{
auto & pipeline = state.io.pipeline;
if (query_context->getSettingsRef().allow_experimental_query_deduplication)
sendPartUUIDs();
/// Send header-block, to allow client to prepare output format for data to send.
{
const auto & header = pipeline.getHeader();
@ -693,6 +707,20 @@ void TCPHandler::receiveUnexpectedTablesStatusRequest()
throw NetException("Unexpected packet TablesStatusRequest received from client", ErrorCodes::UNEXPECTED_PACKET_FROM_CLIENT);
}
void TCPHandler::sendPartUUIDs()
{
auto uuids = query_context->getPartUUIDs()->get();
if (!uuids.empty())
{
for (const auto & uuid : uuids)
LOG_TRACE(log, "Sending UUID: {}", toString(uuid));
writeVarUInt(Protocol::Server::PartUUIDs, *out);
writeVectorBinary(uuids, *out);
out->next();
}
}
void TCPHandler::sendProfileInfo(const BlockStreamProfileInfo & info)
{
writeVarUInt(Protocol::Server::ProfileInfo, *out);
@ -905,6 +933,10 @@ bool TCPHandler::receivePacket()
switch (packet_type)
{
case Protocol::Client::IgnoredPartUUIDs:
/// Part uuids packet if any comes before query.
receiveIgnoredPartUUIDs();
return true;
case Protocol::Client::Query:
if (!state.empty())
receiveUnexpectedQuery();
@ -940,6 +972,16 @@ bool TCPHandler::receivePacket()
}
}
void TCPHandler::receiveIgnoredPartUUIDs()
{
state.part_uuids = true;
std::vector<UUID> uuids;
readVectorBinary(uuids, *in);
if (!uuids.empty())
query_context->getIgnoredPartUUIDs()->add(uuids);
}
void TCPHandler::receiveClusterNameAndSalt()
{
readStringBinary(cluster, *in);

View File

@ -67,6 +67,9 @@ struct QueryState
/// Temporary tables read
bool temporary_tables_read = false;
/// A state got uuids to exclude from a query
bool part_uuids = false;
/// Request requires data from client for function input()
bool need_receive_data_for_input = false;
/// temporary place for incoming data block for input()
@ -173,6 +176,7 @@ private:
void receiveHello();
bool receivePacket();
void receiveQuery();
void receiveIgnoredPartUUIDs();
bool receiveData(bool scalar);
bool readDataNext(const size_t & poll_interval, const int & receive_timeout);
void readData(const Settings & connection_settings);
@ -201,6 +205,7 @@ private:
void sendProgress();
void sendLogs();
void sendEndOfStream();
void sendPartUUIDs();
void sendProfileInfo(const BlockStreamProfileInfo & info);
void sendTotals(const Block & totals);
void sendExtremes(const Block & extremes);

View File

@ -7,6 +7,7 @@
#include <Common/typeid_cast.h>
#include <DataTypes/DataTypeNothing.h>
#include <DataTypes/DataTypeNullable.h>
#include <DataTypes/DataTypeUUID.h>
namespace DB
@ -205,6 +206,7 @@ namespace
virtual void insertStringColumn(const ColumnPtr & column, const String & name) = 0;
virtual void insertUInt64Column(const ColumnPtr & column, const String & name) = 0;
virtual void insertUUIDColumn(const ColumnPtr & column, const String & name) = 0;
};
}
@ -241,6 +243,16 @@ static void injectVirtualColumnsImpl(size_t rows, VirtualColumnsInserter & inser
inserter.insertUInt64Column(column, virtual_column_name);
}
else if (virtual_column_name == "_part_uuid")
{
ColumnPtr column;
if (rows)
column = DataTypeUUID().createColumnConst(rows, task->data_part->uuid)->convertToFullColumnIfConst();
else
column = DataTypeUUID().createColumn();
inserter.insertUUIDColumn(column, virtual_column_name);
}
else if (virtual_column_name == "_partition_id")
{
ColumnPtr column;
@ -271,6 +283,11 @@ namespace
block.insert({column, std::make_shared<DataTypeUInt64>(), name});
}
void insertUUIDColumn(const ColumnPtr & column, const String & name) final
{
block.insert({column, std::make_shared<DataTypeUUID>(), name});
}
Block & block;
};
@ -288,6 +305,10 @@ namespace
columns.push_back(column);
}
void insertUUIDColumn(const ColumnPtr & column, const String &) final
{
columns.push_back(column);
}
Columns & columns;
};
}

View File

@ -4,6 +4,7 @@
#include <DataTypes/DataTypeDate.h>
#include <DataTypes/DataTypeDateTime.h>
#include <DataTypes/DataTypeEnum.h>
#include <DataTypes/DataTypeUUID.h>
#include <DataTypes/DataTypeNullable.h>
#include <DataTypes/DataTypeLowCardinality.h>
#include <DataTypes/NestedUtils.h>
@ -3951,6 +3952,7 @@ NamesAndTypesList MergeTreeData::getVirtuals() const
return NamesAndTypesList{
NameAndTypePair("_part", std::make_shared<DataTypeString>()),
NameAndTypePair("_part_index", std::make_shared<DataTypeUInt64>()),
NameAndTypePair("_part_uuid", std::make_shared<DataTypeUUID>()),
NameAndTypePair("_partition_id", std::make_shared<DataTypeString>()),
NameAndTypePair("_sample_factor", std::make_shared<DataTypeFloat64>()),
};

View File

@ -0,0 +1,38 @@
#include <Storages/MergeTree/MergeTreeDataPartUUID.h>
namespace DB
{
std::vector<UUID> PartUUIDs::add(const std::vector<UUID> & new_uuids)
{
std::lock_guard lock(mutex);
std::vector<UUID> intersection;
/// First check any presence of uuids in a uuids, return duplicates back if any
for (const auto & uuid : new_uuids)
{
if (uuids.find(uuid) != uuids.end())
intersection.emplace_back(uuid);
}
if (intersection.empty())
{
for (const auto & uuid : new_uuids)
uuids.emplace(uuid);
}
return intersection;
}
std::vector<UUID> PartUUIDs::get() const
{
std::lock_guard lock(mutex);
return std::vector<UUID>(uuids.begin(), uuids.end());
}
bool PartUUIDs::has(const UUID & uuid) const
{
std::lock_guard lock(mutex);
return uuids.find(uuid) != uuids.end();
}
}

View File

@ -0,0 +1,34 @@
#pragma once
#include <memory>
#include <mutex>
#include <unordered_set>
#include <Core/UUID.h>
namespace DB
{
/** PartUUIDs is a uuid set to control query deduplication.
* The object is used in query context in both direction:
* Server->Client to send all parts' UUIDs that have been read during the query
* Client->Server to ignored specified parts from being processed.
*
* Current implementation assumes a user setting allow_experimental_query_deduplication=1 is set.
*/
struct PartUUIDs
{
public:
/// Add new UUIDs if not duplicates found otherwise return duplicated UUIDs
std::vector<UUID> add(const std::vector<UUID> & uuids);
/// Get accumulated UUIDs
std::vector<UUID> get() const;
bool has(const UUID & uuid) const;
private:
mutable std::mutex mutex;
std::unordered_set<UUID> uuids;
};
using PartUUIDsPtr = std::shared_ptr<PartUUIDs>;
}

View File

@ -6,7 +6,6 @@
#include <Poco/File.h>
#include <Common/FieldVisitors.h>
#include <Storages/MergeTree/PartitionPruner.h>
#include <Storages/MergeTree/MergeTreeDataSelectExecutor.h>
#include <Storages/MergeTree/MergeTreeSelectProcessor.h>
#include <Storages/MergeTree/MergeTreeReverseSelectProcessor.h>
@ -15,6 +14,7 @@
#include <Storages/MergeTree/MergeTreeIndices.h>
#include <Storages/MergeTree/MergeTreeIndexReader.h>
#include <Storages/MergeTree/KeyCondition.h>
#include <Storages/MergeTree/MergeTreeDataPartUUID.h>
#include <Storages/ReadInOrderOptimizer.h>
#include <Parsers/ASTIdentifier.h>
#include <Parsers/ASTLiteral.h>
@ -35,8 +35,10 @@
#include <Processors/QueryPlan/MergingFinal.h>
#include <Processors/QueryPlan/ReadNothingStep.h>
#include <Core/UUID.h>
#include <DataTypes/DataTypeDate.h>
#include <DataTypes/DataTypeEnum.h>
#include <DataTypes/DataTypeUUID.h>
#include <DataTypes/DataTypesNumber.h>
#include <Storages/VirtualColumnUtils.h>
@ -61,6 +63,7 @@ namespace ErrorCodes
extern const int TOO_MANY_ROWS;
extern const int CANNOT_PARSE_TEXT;
extern const int TOO_MANY_PARTITIONS;
extern const int DUPLICATED_PART_UUIDS;
}
@ -71,14 +74,27 @@ MergeTreeDataSelectExecutor::MergeTreeDataSelectExecutor(const MergeTreeData & d
/// Construct a block consisting only of possible values of virtual columns
static Block getBlockWithPartColumn(const MergeTreeData::DataPartsVector & parts)
static Block getBlockWithVirtualPartColumns(const MergeTreeData::DataPartsVector & parts, bool with_uuid)
{
auto column = ColumnString::create();
auto part_column = ColumnString::create();
auto part_uuid_column = ColumnUUID::create();
for (const auto & part : parts)
column->insert(part->name);
{
part_column->insert(part->name);
if (with_uuid)
part_uuid_column->insert(part->uuid);
}
return Block{ColumnWithTypeAndName(std::move(column), std::make_shared<DataTypeString>(), "_part")};
if (with_uuid)
{
return Block(std::initializer_list<ColumnWithTypeAndName>{
ColumnWithTypeAndName(std::move(part_column), std::make_shared<DataTypeString>(), "_part"),
ColumnWithTypeAndName(std::move(part_uuid_column), std::make_shared<DataTypeUUID>(), "_part_uuid"),
});
}
return Block{ColumnWithTypeAndName(std::move(part_column), std::make_shared<DataTypeString>(), "_part")};
}
@ -162,6 +178,7 @@ QueryPlanPtr MergeTreeDataSelectExecutor::readFromParts(
Names real_column_names;
bool part_column_queried = false;
bool part_uuid_column_queried = false;
bool sample_factor_column_queried = false;
Float64 used_sample_factor = 1;
@ -181,6 +198,11 @@ QueryPlanPtr MergeTreeDataSelectExecutor::readFromParts(
{
virt_column_names.push_back(name);
}
else if (name == "_part_uuid")
{
part_uuid_column_queried = true;
virt_column_names.push_back(name);
}
else if (name == "_sample_factor")
{
sample_factor_column_queried = true;
@ -198,9 +220,9 @@ QueryPlanPtr MergeTreeDataSelectExecutor::readFromParts(
if (real_column_names.empty())
real_column_names.push_back(ExpressionActions::getSmallestColumn(available_real_columns));
/// If `_part` virtual column is requested, we try to use it as an index.
Block virtual_columns_block = getBlockWithPartColumn(parts);
if (part_column_queried)
/// If `_part` or `_part_uuid` virtual columns are requested, we try to filter out data by them.
Block virtual_columns_block = getBlockWithVirtualPartColumns(parts, part_uuid_column_queried);
if (part_column_queried || part_uuid_column_queried)
VirtualColumnUtils::filterBlockWithQuery(query_info.query, virtual_columns_block, context);
auto part_values = VirtualColumnUtils::extractSingleValueFromBlock<String>(virtual_columns_block, "_part");
@ -244,40 +266,13 @@ QueryPlanPtr MergeTreeDataSelectExecutor::readFromParts(
}
}
/// Select the parts in which there can be data that satisfy `minmax_idx_condition` and that match the condition on `_part`,
/// as well as `max_block_number_to_read`.
{
auto prev_parts = parts;
parts.clear();
const Context & query_context = context.hasQueryContext() ? context.getQueryContext() : context;
for (const auto & part : prev_parts)
{
if (part_values.find(part->name) == part_values.end())
continue;
if (query_context.getSettingsRef().allow_experimental_query_deduplication)
selectPartsToReadWithUUIDFilter(parts, part_values, minmax_idx_condition, partition_pruner, max_block_numbers_to_read, query_context);
else
selectPartsToRead(parts, part_values, minmax_idx_condition, partition_pruner, max_block_numbers_to_read);
if (part->isEmpty())
continue;
if (minmax_idx_condition && !minmax_idx_condition->checkInHyperrectangle(
part->minmax_idx.hyperrectangle, data.minmax_idx_column_types).can_be_true)
continue;
if (partition_pruner)
{
if (partition_pruner->canBePruned(part))
continue;
}
if (max_block_numbers_to_read)
{
auto blocks_iterator = max_block_numbers_to_read->find(part->info.partition_id);
if (blocks_iterator == max_block_numbers_to_read->end() || part->info.max_block > blocks_iterator->second)
continue;
}
parts.push_back(part);
}
}
/// Sampling.
Names column_names_to_read = real_column_names;
@ -1849,5 +1844,134 @@ MarkRanges MergeTreeDataSelectExecutor::filterMarksUsingIndex(
return res;
}
void MergeTreeDataSelectExecutor::selectPartsToRead(
MergeTreeData::DataPartsVector & parts,
const std::unordered_set<String> & part_values,
const std::optional<KeyCondition> & minmax_idx_condition,
std::optional<PartitionPruner> & partition_pruner,
const PartitionIdToMaxBlock * max_block_numbers_to_read) const
{
auto prev_parts = parts;
parts.clear();
for (const auto & part : prev_parts)
{
if (part_values.find(part->name) == part_values.end())
continue;
if (part->isEmpty())
continue;
if (minmax_idx_condition && !minmax_idx_condition->checkInHyperrectangle(
part->minmax_idx.hyperrectangle, data.minmax_idx_column_types).can_be_true)
continue;
if (partition_pruner)
{
if (partition_pruner->canBePruned(part))
continue;
}
if (max_block_numbers_to_read)
{
auto blocks_iterator = max_block_numbers_to_read->find(part->info.partition_id);
if (blocks_iterator == max_block_numbers_to_read->end() || part->info.max_block > blocks_iterator->second)
continue;
}
parts.push_back(part);
}
}
void MergeTreeDataSelectExecutor::selectPartsToReadWithUUIDFilter(
MergeTreeData::DataPartsVector & parts,
const std::unordered_set<String> & part_values,
const std::optional<KeyCondition> & minmax_idx_condition,
std::optional<PartitionPruner> & partition_pruner,
const PartitionIdToMaxBlock * max_block_numbers_to_read,
const Context & query_context) const
{
/// const_cast to add UUIDs to context. Bad practice.
Context & non_const_context = const_cast<Context &>(query_context);
/// process_parts prepare parts that have to be read for the query,
/// returns false if duplicated parts' UUID have been met
auto select_parts = [&] (MergeTreeData::DataPartsVector & selected_parts) -> bool
{
auto ignored_part_uuids = non_const_context.getIgnoredPartUUIDs();
std::unordered_set<UUID> temp_part_uuids;
auto prev_parts = selected_parts;
selected_parts.clear();
for (const auto & part : prev_parts)
{
if (part_values.find(part->name) == part_values.end())
continue;
if (part->isEmpty())
continue;
if (minmax_idx_condition
&& !minmax_idx_condition->checkInHyperrectangle(part->minmax_idx.hyperrectangle, data.minmax_idx_column_types)
.can_be_true)
continue;
if (partition_pruner)
{
if (partition_pruner->canBePruned(part))
continue;
}
if (max_block_numbers_to_read)
{
auto blocks_iterator = max_block_numbers_to_read->find(part->info.partition_id);
if (blocks_iterator == max_block_numbers_to_read->end() || part->info.max_block > blocks_iterator->second)
continue;
}
/// populate UUIDs and exclude ignored parts if enabled
if (part->uuid != UUIDHelpers::Nil)
{
/// Skip the part if its uuid is meant to be excluded
if (ignored_part_uuids->has(part->uuid))
continue;
auto result = temp_part_uuids.insert(part->uuid);
if (!result.second)
throw Exception("Found a part with the same UUID on the same replica.", ErrorCodes::LOGICAL_ERROR);
}
selected_parts.push_back(part);
}
if (!temp_part_uuids.empty())
{
auto duplicates = non_const_context.getPartUUIDs()->add(std::vector<UUID>{temp_part_uuids.begin(), temp_part_uuids.end()});
if (!duplicates.empty())
{
/// on a local replica with prefer_localhost_replica=1 if any duplicates appeared during the first pass,
/// adding them to the exclusion, so they will be skipped on second pass
non_const_context.getIgnoredPartUUIDs()->add(duplicates);
return false;
}
}
return true;
};
/// Process parts that have to be read for a query.
auto needs_retry = !select_parts(parts);
/// If any duplicated part UUIDs met during the first step, try to ignore them in second pass
if (needs_retry)
{
LOG_DEBUG(log, "Found duplicate uuids locally, will retry part selection without them");
/// Second attempt didn't help, throw an exception
if (!select_parts(parts))
throw Exception("Found duplicate UUIDs while processing query.", ErrorCodes::DUPLICATED_PART_UUIDS);
}
}
}

View File

@ -4,6 +4,7 @@
#include <Storages/SelectQueryInfo.h>
#include <Storages/MergeTree/MergeTreeData.h>
#include <Storages/MergeTree/RangesInDataPart.h>
#include <Storages/MergeTree/PartitionPruner.h>
namespace DB
@ -113,6 +114,24 @@ private:
const Settings & settings,
const MergeTreeReaderSettings & reader_settings,
Poco::Logger * log);
/// Select the parts in which there can be data that satisfy `minmax_idx_condition` and that match the condition on `_part`,
/// as well as `max_block_number_to_read`.
void selectPartsToRead(
MergeTreeData::DataPartsVector & parts,
const std::unordered_set<String> & part_values,
const std::optional<KeyCondition> & minmax_idx_condition,
std::optional<PartitionPruner> & partition_pruner,
const PartitionIdToMaxBlock * max_block_numbers_to_read) const;
/// Same as previous but also skip parts uuids if any to the query context, or skip parts which uuids marked as excluded.
void selectPartsToReadWithUUIDFilter(
MergeTreeData::DataPartsVector & parts,
const std::unordered_set<String> & part_values,
const std::optional<KeyCondition> & minmax_idx_condition,
std::optional<PartitionPruner> & partition_pruner,
const PartitionIdToMaxBlock * max_block_numbers_to_read,
const Context & query_context) const;
};
}

View File

@ -4,6 +4,7 @@
#include <Disks/IDisk.h>
#include <DataTypes/DataTypeFactory.h>
#include <DataTypes/DataTypeUUID.h>
#include <DataTypes/DataTypesNumber.h>
#include <Storages/Distributed/DistributedBlockOutputStream.h>
@ -345,6 +346,7 @@ NamesAndTypesList StorageDistributed::getVirtuals() const
NameAndTypePair("_table", std::make_shared<DataTypeString>()),
NameAndTypePair("_part", std::make_shared<DataTypeString>()),
NameAndTypePair("_part_index", std::make_shared<DataTypeUInt64>()),
NameAndTypePair("_part_uuid", std::make_shared<DataTypeUUID>()),
NameAndTypePair("_partition_id", std::make_shared<DataTypeString>()),
NameAndTypePair("_sample_factor", std::make_shared<DataTypeFloat64>()),
NameAndTypePair("_shard_num", std::make_shared<DataTypeUInt32>()),

View File

@ -48,6 +48,7 @@ SRCS(
MergeTree/MergeTreeDataPartInMemory.cpp
MergeTree/MergeTreeDataPartTTLInfo.cpp
MergeTree/MergeTreeDataPartType.cpp
MergeTree/MergeTreeDataPartUUID.cpp
MergeTree/MergeTreeDataPartWide.cpp
MergeTree/MergeTreeDataPartWriterCompact.cpp
MergeTree/MergeTreeDataPartWriterInMemory.cpp

View File

@ -0,0 +1,5 @@
<yandex>
<merge_tree>
<assign_part_uuids>1</assign_part_uuids>
</merge_tree>
</yandex>

View File

@ -0,0 +1,24 @@
<yandex>
<remote_servers>
<test_cluster>
<shard>
<replica>
<host>node1</host>
<port>9000</port>
</replica>
</shard>
<shard>
<replica>
<host>node2</host>
<port>9000</port>
</replica>
</shard>
<shard>
<replica>
<host>node3</host>
<port>9000</port>
</replica>
</shard>
</test_cluster>
</remote_servers>
</yandex>

View File

@ -0,0 +1,165 @@
import uuid
import pytest
from helpers.cluster import ClickHouseCluster
from helpers.test_tools import TSV
DUPLICATED_UUID = uuid.uuid4()
cluster = ClickHouseCluster(__file__)
node1 = cluster.add_instance(
'node1',
main_configs=['configs/remote_servers.xml', 'configs/deduplication_settings.xml'])
node2 = cluster.add_instance(
'node2',
main_configs=['configs/remote_servers.xml', 'configs/deduplication_settings.xml'])
node3 = cluster.add_instance(
'node3',
main_configs=['configs/remote_servers.xml', 'configs/deduplication_settings.xml'])
@pytest.fixture(scope="module")
def started_cluster():
try:
cluster.start()
yield cluster
finally:
cluster.shutdown()
def prepare_node(node, parts_uuid=None):
node.query("""
CREATE TABLE t(_prefix UInt8 DEFAULT 0, key UInt64, value UInt64)
ENGINE MergeTree()
ORDER BY tuple()
PARTITION BY _prefix
SETTINGS index_granularity = 1
""")
node.query("""
CREATE TABLE d AS t ENGINE=Distributed(test_cluster, default, t)
""")
# Stop merges while populating test data
node.query("SYSTEM STOP MERGES")
# Create 5 parts
for i in range(1, 6):
node.query("INSERT INTO t VALUES ({}, {}, {})".format(i, i, i))
node.query("DETACH TABLE t")
if parts_uuid:
for part, part_uuid in parts_uuid:
script = """
echo -n '{}' > /var/lib/clickhouse/data/default/t/{}/uuid.txt
""".format(part_uuid, part)
node.exec_in_container(["bash", "-c", script])
# Attach table back
node.query("ATTACH TABLE t")
# NOTE:
# due to absence of the ability to lock part, need to operate on parts with preventin merges
# node.query("SYSTEM START MERGES")
# node.query("OPTIMIZE TABLE t FINAL")
print(node.name)
print(node.query("SELECT name, uuid, partition FROM system.parts WHERE table = 't' AND active ORDER BY name"))
assert '5' == node.query("SELECT count() FROM system.parts WHERE table = 't' AND active").strip()
if parts_uuid:
for part, part_uuid in parts_uuid:
assert '1' == node.query(
"SELECT count() FROM system.parts WHERE table = 't' AND uuid = '{}' AND active".format(
part_uuid)).strip()
@pytest.fixture(scope="module")
def prepared_cluster(started_cluster):
print("duplicated UUID: {}".format(DUPLICATED_UUID))
prepare_node(node1, parts_uuid=[("3_3_3_0", DUPLICATED_UUID)])
prepare_node(node2, parts_uuid=[("3_3_3_0", DUPLICATED_UUID)])
prepare_node(node3)
def test_virtual_column(prepared_cluster):
# Part containing `key=3` has the same fingerprint on both nodes,
# we expect it to be included only once in the end result.;
# select query is using virtucal column _part_fingerprint to filter out part in one shard
expected = """
1 2
2 2
3 1
4 2
5 2
"""
assert TSV(expected) == TSV(node1.query("""
SELECT
key,
count() AS c
FROM d
WHERE ((_shard_num = 1) AND (_part_uuid != '{}')) OR (_shard_num = 2)
GROUP BY key
ORDER BY
key ASC
""".format(DUPLICATED_UUID)))
def test_with_deduplication(prepared_cluster):
# Part containing `key=3` has the same fingerprint on both nodes,
# we expect it to be included only once in the end result
expected = """
1 3
2 3
3 2
4 3
5 3
"""
assert TSV(expected) == TSV(node1.query(
"SET allow_experimental_query_deduplication=1; SELECT key, count() c FROM d GROUP BY key ORDER BY key"))
def test_no_merge_with_deduplication(prepared_cluster):
# Part containing `key=3` has the same fingerprint on both nodes,
# we expect it to be included only once in the end result.
# even with distributed_group_by_no_merge=1 the duplicated part should be excluded from the final result
expected = """
1 1
2 1
3 1
4 1
5 1
1 1
2 1
3 1
4 1
5 1
1 1
2 1
4 1
5 1
"""
assert TSV(expected) == TSV(node1.query("SELECT key, count() c FROM d GROUP BY key ORDER BY key", settings={
"allow_experimental_query_deduplication": 1,
"distributed_group_by_no_merge": 1,
}))
def test_without_deduplication(prepared_cluster):
# Part containing `key=3` has the same fingerprint on both nodes,
# but allow_experimental_query_deduplication is disabled,
# so it will not be excluded
expected = """
1 3
2 3
3 3
4 3
5 3
"""
assert TSV(expected) == TSV(node1.query(
"SET allow_experimental_query_deduplication=0; SELECT key, count() c FROM d GROUP BY key ORDER BY key"))