CLICKHOUSE-606: query deduplication based on parts' UUID

* add the query data deduplication excluding duplicated parts in MergeTree family engines.

query deduplication is based on parts' UUID which should be enabled first with merge_tree setting
assign_part_uuids=1

allow_experimental_query_deduplication setting is to enable part deduplication, default ot false.

data part UUID is a mechanism of giving a data part a unique identifier.
Having UUID and deduplication mechanism provides a potential of moving parts
between shards preserving data consistency on a read path:
duplicated UUIDs will cause root executor to retry query against on of the replica explicitly
asking to exclude encountered duplicated fingerprints during a distributed query execution.

NOTE: this implementation don't provide any knobs to lock part and hence its UUID. Any mutations/merge will
update part's UUID.

* add _part_uuid virtual column, allowing to use UUIDs in predicates.

Signed-off-by: Aleksei Semiglazov <asemiglazov@cloudflare.com>

address comments
This commit is contained in:
Aleksei Semiglazov 2020-11-20 17:23:53 +00:00
parent b9647e5326
commit 921518db0a
27 changed files with 607 additions and 28 deletions

View File

@ -1900,6 +1900,9 @@ private:
switch (packet.type)
{
case Protocol::Server::PartUUIDs:
return true;
case Protocol::Server::Data:
if (!cancelled)
onData(packet.block);

View File

@ -542,6 +542,12 @@ void Connection::sendData(const Block & block, const String & name, bool scalar)
throttler->add(out->count() - prev_bytes);
}
void Connection::sendIgnoredPartUUIDs(const std::vector<UUID> & uuids)
{
writeVarUInt(Protocol::Client::IgnoredPartUUIDs, *out);
writeVectorBinary(uuids, *out);
out->next();
}
void Connection::sendPreparedData(ReadBuffer & input, size_t size, const String & name)
{
@ -798,6 +804,10 @@ Packet Connection::receivePacket(std::function<void(Poco::Net::Socket &)> async_
case Protocol::Server::EndOfStream:
return res;
case Protocol::Server::PartUUIDs:
readVectorBinary(res.part_uuids, *in);
return res;
default:
/// In unknown state, disconnect - to not leave unsynchronised connection.
disconnect();

View File

@ -66,6 +66,7 @@ struct Packet
std::vector<String> multistring_message;
Progress progress;
BlockStreamProfileInfo profile_info;
std::vector<UUID> part_uuids;
Packet() : type(Protocol::Server::Hello) {}
};
@ -157,6 +158,8 @@ public:
void sendScalarsData(Scalars & data);
/// Send all contents of external (temporary) tables.
void sendExternalTablesData(ExternalTablesData & data);
/// Send parts' uuids to excluded them from query processing
void sendIgnoredPartUUIDs(const std::vector<UUID> & uuids);
/// Send prepared block of data (serialized and, if need, compressed), that will be read from 'input'.
/// You could pass size of serialized/compressed block.

View File

@ -140,6 +140,21 @@ void MultiplexedConnections::sendQuery(
sent_query = true;
}
void MultiplexedConnections::sendIgnoredPartUUIDs(const std::vector<UUID> & uuids)
{
std::lock_guard lock(cancel_mutex);
if (sent_query)
throw Exception("Cannot send uuids after query is sent.", ErrorCodes::LOGICAL_ERROR);
for (ReplicaState & state : replica_states)
{
Connection * connection = state.connection;
if (connection != nullptr)
connection->sendIgnoredPartUUIDs(uuids);
}
}
Packet MultiplexedConnections::receivePacket()
{
std::lock_guard lock(cancel_mutex);
@ -195,6 +210,7 @@ Packet MultiplexedConnections::drain()
switch (packet.type)
{
case Protocol::Server::PartUUIDs:
case Protocol::Server::Data:
case Protocol::Server::Progress:
case Protocol::Server::ProfileInfo:
@ -253,6 +269,7 @@ Packet MultiplexedConnections::receivePacketUnlocked(std::function<void(Poco::Ne
switch (packet.type)
{
case Protocol::Server::PartUUIDs:
case Protocol::Server::Data:
case Protocol::Server::Progress:
case Protocol::Server::ProfileInfo:

View File

@ -50,6 +50,9 @@ public:
/// Send a request to the replica to cancel the request
void sendCancel();
/// Send parts' uuids to replicas to exclude them from query processing
void sendIgnoredPartUUIDs(const std::vector<UUID> & uuids);
/** On each replica, read and skip all packets to EndOfStream or Exception.
* Returns EndOfStream if no exception has been received. Otherwise
* returns the last received packet of type Exception.

View File

@ -26,4 +26,6 @@ using ColumnInt256 = ColumnVector<Int256>;
using ColumnFloat32 = ColumnVector<Float32>;
using ColumnFloat64 = ColumnVector<Float64>;
using ColumnUUID = ColumnVector<UInt128>;
}

View File

@ -533,6 +533,7 @@
M(564, INTERSERVER_SCHEME_DOESNT_MATCH) \
M(565, TOO_MANY_PARTITIONS) \
M(566, CANNOT_RMDIR) \
M(567, DUPLICATED_PART_UUIDS) \
\
M(999, KEEPER_EXCEPTION) \
M(1000, POCO_EXCEPTION) \

View File

@ -75,8 +75,9 @@ namespace Protocol
TablesStatusResponse = 9, /// A response to TablesStatus request.
Log = 10, /// System logs of the query execution
TableColumns = 11, /// Columns' description for default values calculation
PartUUIDs = 12, /// List of unique parts ids.
MAX = TableColumns,
MAX = PartUUIDs,
};
/// NOTE: If the type of packet argument would be Enum, the comparison packet >= 0 && packet < 10
@ -98,6 +99,7 @@ namespace Protocol
"TablesStatusResponse",
"Log",
"TableColumns",
"PartUUIDs",
};
return packet <= MAX
? data[packet]
@ -132,8 +134,9 @@ namespace Protocol
TablesStatusRequest = 5, /// Check status of tables on the server.
KeepAlive = 6, /// Keep the connection alive
Scalar = 7, /// A block of data (compressed or not).
IgnoredPartUUIDs = 8, /// List of unique parts ids to exclude from query processing
MAX = Scalar,
MAX = IgnoredPartUUIDs,
};
inline const char * toString(UInt64 packet)
@ -147,6 +150,7 @@ namespace Protocol
"TablesStatusRequest",
"KeepAlive",
"Scalar",
"IgnoredPartUUIDs",
};
return packet <= MAX
? data[packet]

View File

@ -420,6 +420,8 @@ class IColumn;
M(Bool, async_socket_for_remote, true, "Asynchronously read from socket executing remote query", 0) \
\
M(Bool, optimize_rewrite_sum_if_to_count_if, true, "Rewrite sumIf() and sum(if()) function countIf() function when logically equivalent", 0) \
M(Bool, allow_experimental_query_deduplication, false, "Allow sending parts' UUIDs for a query in order to deduplicate data parts if any", 0) \
\
/** Obsolete settings that do nothing but left for compatibility reasons. Remove each one after half a year of obsolescence. */ \
\
M(UInt64, max_memory_usage_for_all_queries, 0, "Obsolete. Will be removed after 2020-10-20", 0) \

View File

@ -13,6 +13,7 @@
#include <Interpreters/InternalTextLogsQueue.h>
#include <IO/ConnectionTimeoutsContext.h>
#include <Common/FiberStack.h>
#include <Storages/MergeTree/MergeTreeDataPartUUID.h>
namespace DB
{
@ -20,6 +21,7 @@ namespace DB
namespace ErrorCodes
{
extern const int UNKNOWN_PACKET_FROM_SERVER;
extern const int DUPLICATED_PART_UUIDS;
}
RemoteQueryExecutor::RemoteQueryExecutor(
@ -158,6 +160,7 @@ void RemoteQueryExecutor::sendQuery()
std::lock_guard guard(was_cancelled_mutex);
established = true;
was_cancelled = false;
auto timeouts = ConnectionTimeouts::getTCPTimeoutsWithFailover(settings);
ClientInfo modified_client_info = context.getClientInfo();
@ -167,6 +170,14 @@ void RemoteQueryExecutor::sendQuery()
modified_client_info.client_trace_context = CurrentThread::get().thread_trace_context;
}
{
std::lock_guard lock(duplicated_part_uuids_mutex);
if (!duplicated_part_uuids.empty())
{
multiplexed_connections->sendIgnoredPartUUIDs(duplicated_part_uuids);
}
}
multiplexed_connections->sendQuery(timeouts, query, query_id, stage, modified_client_info, true);
established = false;
@ -195,7 +206,29 @@ Block RemoteQueryExecutor::read()
Packet packet = multiplexed_connections->receivePacket();
if (auto block = processPacket(std::move(packet)))
{
if (got_duplicated_part_uuids)
{
/// Cancel previous query and disconnect before retry.
cancel();
multiplexed_connections->disconnect();
/// Only resend once, otherwise throw an exception
if (!resent_query)
{
if (log)
LOG_DEBUG(log, "Found duplicate UUIDs, will retry query without those parts");
resent_query = true;
sent_query = false;
got_duplicated_part_uuids = false;
/// Consecutive read will implicitly send query first.
return read();
}
throw Exception("Found duplicate uuids while processing query.", ErrorCodes::DUPLICATED_PART_UUIDS);
}
return *block;
}
}
}
@ -233,7 +266,29 @@ std::variant<Block, int> RemoteQueryExecutor::read(std::unique_ptr<ReadContext>
else
{
if (auto data = processPacket(std::move(read_context->packet)))
{
if (got_duplicated_part_uuids)
{
/// Cancel previous query and disconnect before retry.
cancel();
multiplexed_connections->disconnect();
/// Only resend once, otherwise throw an exception
if (!resent_query)
{
if (log)
LOG_DEBUG(log, "Found duplicate UUIDs, will retry query without those parts");
resent_query = true;
sent_query = false;
got_duplicated_part_uuids = false;
/// Consecutive read will implicitly send query first.
return read(read_context);
}
throw Exception("Found duplicate uuids while processing query.", ErrorCodes::DUPLICATED_PART_UUIDS);
}
return std::move(*data);
}
}
}
while (true);
@ -246,6 +301,13 @@ std::optional<Block> RemoteQueryExecutor::processPacket(Packet packet)
{
switch (packet.type)
{
case Protocol::Server::PartUUIDs:
if (!setPartUUIDs(packet.part_uuids))
{
got_duplicated_part_uuids = true;
return Block();
}
break;
case Protocol::Server::Data:
/// If the block is not empty and is not a header block
if (packet.block && (packet.block.rows() > 0))
@ -306,6 +368,20 @@ std::optional<Block> RemoteQueryExecutor::processPacket(Packet packet)
return {};
}
bool RemoteQueryExecutor::setPartUUIDs(const std::vector<UUID> & uuids)
{
Context & query_context = const_cast<Context &>(context).getQueryContext();
auto duplicates = query_context.getPartUUIDs()->add(uuids);
if (!duplicates.empty())
{
std::lock_guard lock(duplicated_part_uuids_mutex);
duplicated_part_uuids.insert(duplicated_part_uuids.begin(), duplicates.begin(), duplicates.end());
return false;
}
return true;
}
void RemoteQueryExecutor::finish(std::unique_ptr<ReadContext> * read_context)
{
/** If one of:
@ -383,6 +459,7 @@ void RemoteQueryExecutor::sendExternalTables()
{
std::lock_guard lock(external_tables_mutex);
external_tables_data.clear();
external_tables_data.reserve(count);
for (size_t i = 0; i < count; ++i)
@ -446,7 +523,7 @@ bool RemoteQueryExecutor::isQueryPending() const
bool RemoteQueryExecutor::hasThrownException() const
{
return got_exception_from_replica || got_unknown_packet_from_replica;
return got_exception_from_replica || got_unknown_packet_from_replica || got_duplicated_part_uuids;
}
}

View File

@ -57,6 +57,9 @@ public:
/// Create connection and send query, external tables and scalars.
void sendQuery();
/// Query is resent to a replica, the query itself can be modified.
std::atomic<bool> resent_query { false };
/// Read next block of data. Returns empty block if query is finished.
Block read();
@ -152,6 +155,14 @@ private:
*/
std::atomic<bool> got_unknown_packet_from_replica { false };
/** Got duplicated uuids from replica
*/
std::atomic<bool> got_duplicated_part_uuids{ false };
/// Parts uuids, collected from remote replicas
std::mutex duplicated_part_uuids_mutex;
std::vector<UUID> duplicated_part_uuids;
PoolMode pool_mode = PoolMode::GET_MANY;
StorageID main_table = StorageID::createEmpty();
@ -163,6 +174,10 @@ private:
/// Send all temporary tables to remote servers
void sendExternalTables();
/** Set part uuids to a query context, collected from remote replicas.
*/
bool setPartUUIDs(const std::vector<UUID> & uuids);
/// If wasn't sent yet, send request to cancel all connections to replicas
void tryCancel(const char * reason, std::unique_ptr<ReadContext> * read_context);
@ -174,6 +189,9 @@ private:
/// Process packet for read and return data block if possible.
std::optional<Block> processPacket(Packet packet);
/// Reads packet by packet
Block readPackets();
};
}

View File

@ -910,6 +910,7 @@ inline void writeBinary(const StringRef & x, WriteBuffer & buf) { writeStringBin
inline void writeBinary(const std::string_view & x, WriteBuffer & buf) { writeStringBinary(x, buf); }
inline void writeBinary(const Int128 & x, WriteBuffer & buf) { writePODBinary(x, buf); }
inline void writeBinary(const UInt128 & x, WriteBuffer & buf) { writePODBinary(x, buf); }
inline void writeBinary(const UUID & x, WriteBuffer & buf) { writePODBinary(x, buf); }
inline void writeBinary(const DummyUInt256 & x, WriteBuffer & buf) { writePODBinary(x, buf); }
inline void writeBinary(const Decimal32 & x, WriteBuffer & buf) { writePODBinary(x, buf); }
inline void writeBinary(const Decimal64 & x, WriteBuffer & buf) { writePODBinary(x, buf); }

View File

@ -64,6 +64,7 @@
#include <Common/RemoteHostFilter.h>
#include <Interpreters/DatabaseCatalog.h>
#include <Storages/MergeTree/BackgroundJobsExecutor.h>
#include <Storages/MergeTree/MergeTreeDataPartUUID.h>
namespace ProfileEvents
@ -2510,4 +2511,22 @@ StorageID Context::resolveStorageIDImpl(StorageID storage_id, StorageNamespace w
return StorageID::createEmpty();
}
PartUUIDsPtr Context::getPartUUIDs()
{
auto lock = getLock();
if (!part_uuids)
part_uuids = std::make_shared<PartUUIDs>();
return part_uuids;
}
PartUUIDsPtr Context::getIgnoredPartUUIDs()
{
auto lock = getLock();
if (!ignored_part_uuids)
ignored_part_uuids = std::make_shared<PartUUIDs>();
return ignored_part_uuids;
}
}

View File

@ -107,6 +107,8 @@ using StoragePolicyPtr = std::shared_ptr<const IStoragePolicy>;
using StoragePoliciesMap = std::map<String, StoragePolicyPtr>;
class StoragePolicySelector;
using StoragePolicySelectorPtr = std::shared_ptr<const StoragePolicySelector>;
struct PartUUIDs;
using PartUUIDsPtr = std::shared_ptr<PartUUIDs>;
class IOutputFormat;
using OutputFormatPtr = std::shared_ptr<IOutputFormat>;
@ -264,6 +266,9 @@ private:
using SampleBlockCache = std::unordered_map<std::string, Block>;
mutable SampleBlockCache sample_block_cache;
std::shared_ptr<PartUUIDs> part_uuids; /// set of parts' uuids, is used for query parts deduplication
std::shared_ptr<PartUUIDs> ignored_part_uuids; /// set of parts' uuids are meant to be excluded from query processing
NameToNameMap query_parameters; /// Dictionary with query parameters for prepared statements.
/// (key=name, value)
@ -734,6 +739,9 @@ public:
};
MySQLWireContext mysql;
PartUUIDsPtr getPartUUIDs();
PartUUIDsPtr getIgnoredPartUUIDs();
private:
std::unique_lock<std::recursive_mutex> getLock() const;

View File

@ -24,6 +24,7 @@
#include <Interpreters/OpenTelemetrySpanLog.h>
#include <Storages/StorageMemory.h>
#include <Storages/StorageReplicatedMergeTree.h>
#include <Storages/MergeTree/MergeTreeDataPartUUID.h>
#include <Core/ExternalTable.h>
#include <Storages/ColumnDefault.h>
#include <DataTypes/DataTypeLowCardinality.h>
@ -180,10 +181,16 @@ void TCPHandler::runImpl()
/** If Query - process it. If Ping or Cancel - go back to the beginning.
* There may come settings for a separate query that modify `query_context`.
* It's possible to receive part uuids packet before the query, so then receivePacket has to be called twice.
*/
if (!receivePacket())
continue;
/** If part_uuids got received in previous packet, trying to read again.
*/
if (state.empty() && state.part_uuids && !receivePacket())
continue;
query_scope.emplace(*query_context);
send_exception_with_stack_trace = query_context->getSettingsRef().calculate_text_stack_trace;
@ -528,6 +535,8 @@ void TCPHandler::processOrdinaryQuery()
/// Pull query execution result, if exists, and send it to network.
if (state.io.in)
{
sendPartUUIDs();
/// This allows the client to prepare output format
if (Block header = state.io.in->getHeader())
sendData(header);
@ -592,6 +601,8 @@ void TCPHandler::processOrdinaryQueryWithProcessors()
{
auto & pipeline = state.io.pipeline;
sendPartUUIDs();
/// Send header-block, to allow client to prepare output format for data to send.
{
const auto & header = pipeline.getHeader();
@ -693,6 +704,20 @@ void TCPHandler::receiveUnexpectedTablesStatusRequest()
throw NetException("Unexpected packet TablesStatusRequest received from client", ErrorCodes::UNEXPECTED_PACKET_FROM_CLIENT);
}
void TCPHandler::sendPartUUIDs()
{
auto uuids = query_context->getPartUUIDs()->get();
if (!uuids.empty())
{
for (const auto & uuid : uuids)
LOG_TRACE(log, "Sending UUID: {}", toString(uuid));
writeVarUInt(Protocol::Server::PartUUIDs, *out);
writeVectorBinary(uuids, *out);
out->next();
}
}
void TCPHandler::sendProfileInfo(const BlockStreamProfileInfo & info)
{
writeVarUInt(Protocol::Server::ProfileInfo, *out);
@ -905,6 +930,10 @@ bool TCPHandler::receivePacket()
switch (packet_type)
{
case Protocol::Client::IgnoredPartUUIDs:
/// Part uuids packet if any comes before query.
receiveIgnoredPartUUIDs();
return true;
case Protocol::Client::Query:
if (!state.empty())
receiveUnexpectedQuery();
@ -940,6 +969,16 @@ bool TCPHandler::receivePacket()
}
}
void TCPHandler::receiveIgnoredPartUUIDs()
{
state.part_uuids = true;
std::vector<UUID> uuids;
readVectorBinary(uuids, *in);
if (!uuids.empty())
query_context->getIgnoredPartUUIDs()->add(uuids);
}
void TCPHandler::receiveClusterNameAndSalt()
{
readStringBinary(cluster, *in);

View File

@ -67,6 +67,9 @@ struct QueryState
/// Temporary tables read
bool temporary_tables_read = false;
/// A state got uuids to exclude from a query
bool part_uuids = false;
/// Request requires data from client for function input()
bool need_receive_data_for_input = false;
/// temporary place for incoming data block for input()
@ -173,6 +176,7 @@ private:
void receiveHello();
bool receivePacket();
void receiveQuery();
void receiveIgnoredPartUUIDs();
bool receiveData(bool scalar);
bool readDataNext(const size_t & poll_interval, const int & receive_timeout);
void readData(const Settings & connection_settings);
@ -201,6 +205,7 @@ private:
void sendProgress();
void sendLogs();
void sendEndOfStream();
void sendPartUUIDs();
void sendProfileInfo(const BlockStreamProfileInfo & info);
void sendTotals(const Block & totals);
void sendExtremes(const Block & extremes);

View File

@ -7,6 +7,7 @@
#include <Common/typeid_cast.h>
#include <DataTypes/DataTypeNothing.h>
#include <DataTypes/DataTypeNullable.h>
#include <DataTypes/DataTypeUUID.h>
namespace DB
@ -205,6 +206,7 @@ namespace
virtual void insertStringColumn(const ColumnPtr & column, const String & name) = 0;
virtual void insertUInt64Column(const ColumnPtr & column, const String & name) = 0;
virtual void insertUUIDColumn(const ColumnPtr & column, const String & name) = 0;
};
}
@ -241,6 +243,16 @@ static void injectVirtualColumnsImpl(size_t rows, VirtualColumnsInserter & inser
inserter.insertUInt64Column(column, virtual_column_name);
}
else if (virtual_column_name == "_part_uuid")
{
ColumnPtr column;
if (rows)
column = DataTypeUUID().createColumnConst(rows, task->data_part->uuid)->convertToFullColumnIfConst();
else
column = DataTypeUUID().createColumn();
inserter.insertUUIDColumn(column, virtual_column_name);
}
else if (virtual_column_name == "_partition_id")
{
ColumnPtr column;
@ -271,6 +283,11 @@ namespace
block.insert({column, std::make_shared<DataTypeUInt64>(), name});
}
void insertUUIDColumn(const ColumnPtr & column, const String & name) final
{
block.insert({column, std::make_shared<DataTypeUUID>(), name});
}
Block & block;
};
@ -288,6 +305,10 @@ namespace
columns.push_back(column);
}
void insertUUIDColumn(const ColumnPtr & column, const String &) final
{
columns.push_back(column);
}
Columns & columns;
};
}

View File

@ -4,6 +4,7 @@
#include <DataTypes/DataTypeDate.h>
#include <DataTypes/DataTypeDateTime.h>
#include <DataTypes/DataTypeEnum.h>
#include <DataTypes/DataTypeUUID.h>
#include <DataTypes/DataTypeNullable.h>
#include <DataTypes/DataTypeLowCardinality.h>
#include <DataTypes/NestedUtils.h>
@ -3949,6 +3950,7 @@ NamesAndTypesList MergeTreeData::getVirtuals() const
return NamesAndTypesList{
NameAndTypePair("_part", std::make_shared<DataTypeString>()),
NameAndTypePair("_part_index", std::make_shared<DataTypeUInt64>()),
NameAndTypePair("_part_uuid", std::make_shared<DataTypeUUID>()),
NameAndTypePair("_partition_id", std::make_shared<DataTypeString>()),
NameAndTypePair("_sample_factor", std::make_shared<DataTypeFloat64>()),
};

View File

@ -0,0 +1,38 @@
#include <Storages/MergeTree/MergeTreeDataPartUUID.h>
namespace DB
{
std::vector<UUID> PartUUIDs::add(const std::vector<UUID> & new_uuids)
{
std::lock_guard lock(mutex);
std::vector<UUID> intersection;
/// First check any presence of uuids in a uuids, return duplicates back if any
for (const auto & uuid : new_uuids)
{
if (uuids.find(uuid) != uuids.end())
intersection.emplace_back(uuid);
}
if (intersection.empty())
{
for (const auto & uuid : new_uuids)
uuids.emplace(uuid);
}
return intersection;
}
std::vector<UUID> PartUUIDs::get() const
{
std::lock_guard lock(mutex);
return std::vector<UUID>(uuids.begin(), uuids.end());
}
bool PartUUIDs::has(const UUID & uuid) const
{
std::lock_guard lock(mutex);
return uuids.find(uuid) != uuids.end();
}
}

View File

@ -0,0 +1,34 @@
#pragma once
#include <memory>
#include <mutex>
#include <unordered_set>
#include <Core/UUID.h>
namespace DB
{
/** PartUUIDs is a uuid set to control query deduplication.
* The object is used in query context in both direction:
* Server->Client to send all parts' UUIDs that have been read during the query
* Client->Server to ignored specified parts from being processed.
*
* Current implementation assumes a user setting allow_experimental_query_deduplication=1 is set.
*/
struct PartUUIDs
{
public:
/// Add new UUIDs if not duplicates found otherwise return duplicated UUIDs
std::vector<UUID> add(const std::vector<UUID> & uuids);
/// Get accumulated UUIDs
std::vector<UUID> get() const;
bool has(const UUID & uuid) const;
private:
mutable std::mutex mutex;
std::unordered_set<UUID> uuids;
};
using PartUUIDsPtr = std::shared_ptr<PartUUIDs>;
}

View File

@ -15,6 +15,7 @@
#include <Storages/MergeTree/MergeTreeIndices.h>
#include <Storages/MergeTree/MergeTreeIndexReader.h>
#include <Storages/MergeTree/KeyCondition.h>
#include <Storages/MergeTree/MergeTreeDataPartUUID.h>
#include <Storages/ReadInOrderOptimizer.h>
#include <Parsers/ASTIdentifier.h>
#include <Parsers/ASTLiteral.h>
@ -35,8 +36,10 @@
#include <Processors/QueryPlan/MergingFinal.h>
#include <Processors/QueryPlan/ReadNothingStep.h>
#include <Core/UUID.h>
#include <DataTypes/DataTypeDate.h>
#include <DataTypes/DataTypeEnum.h>
#include <DataTypes/DataTypeUUID.h>
#include <DataTypes/DataTypesNumber.h>
#include <Storages/VirtualColumnUtils.h>
@ -61,6 +64,7 @@ namespace ErrorCodes
extern const int TOO_MANY_ROWS;
extern const int CANNOT_PARSE_TEXT;
extern const int TOO_MANY_PARTITIONS;
extern const int DUPLICATED_PART_UUIDS;
}
@ -71,14 +75,27 @@ MergeTreeDataSelectExecutor::MergeTreeDataSelectExecutor(const MergeTreeData & d
/// Construct a block consisting only of possible values of virtual columns
static Block getBlockWithPartColumn(const MergeTreeData::DataPartsVector & parts)
static Block getBlockWithVirtualPartColumns(const MergeTreeData::DataPartsVector & parts, bool with_uuid)
{
auto column = ColumnString::create();
auto part_column = ColumnString::create();
auto part_uuid_column = ColumnUUID::create();
for (const auto & part : parts)
column->insert(part->name);
{
part_column->insert(part->name);
if (with_uuid)
part_uuid_column->insert(part->uuid);
}
return Block{ColumnWithTypeAndName(std::move(column), std::make_shared<DataTypeString>(), "_part")};
if (with_uuid)
{
return Block(std::initializer_list<ColumnWithTypeAndName>{
ColumnWithTypeAndName(std::move(part_column), std::make_shared<DataTypeString>(), "_part"),
ColumnWithTypeAndName(std::move(part_uuid_column), std::make_shared<DataTypeUUID>(), "_part_uuid"),
});
}
return Block{ColumnWithTypeAndName(std::move(part_column), std::make_shared<DataTypeString>(), "_part")};
}
@ -162,6 +179,7 @@ QueryPlanPtr MergeTreeDataSelectExecutor::readFromParts(
Names real_column_names;
bool part_column_queried = false;
bool part_uuid_column_queried = false;
bool sample_factor_column_queried = false;
Float64 used_sample_factor = 1;
@ -181,6 +199,11 @@ QueryPlanPtr MergeTreeDataSelectExecutor::readFromParts(
{
virt_column_names.push_back(name);
}
else if (name == "_part_uuid")
{
part_uuid_column_queried = true;
virt_column_names.push_back(name);
}
else if (name == "_sample_factor")
{
sample_factor_column_queried = true;
@ -198,9 +221,9 @@ QueryPlanPtr MergeTreeDataSelectExecutor::readFromParts(
if (real_column_names.empty())
real_column_names.push_back(ExpressionActions::getSmallestColumn(available_real_columns));
/// If `_part` virtual column is requested, we try to use it as an index.
Block virtual_columns_block = getBlockWithPartColumn(parts);
if (part_column_queried)
/// If `_part` or `_part_uuid` virtual columns are requested, we try to filter out data by them.
Block virtual_columns_block = getBlockWithVirtualPartColumns(parts, part_uuid_column_queried);
if (part_column_queried || part_uuid_column_queried)
VirtualColumnUtils::filterBlockWithQuery(query_info.query, virtual_columns_block, context);
auto part_values = VirtualColumnUtils::extractSingleValueFromBlock<String>(virtual_columns_block, "_part");
@ -246,36 +269,88 @@ QueryPlanPtr MergeTreeDataSelectExecutor::readFromParts(
/// Select the parts in which there can be data that satisfy `minmax_idx_condition` and that match the condition on `_part`,
/// as well as `max_block_number_to_read`.
/// Skip parts uuids if any to the query context, or skip parts which uuids marked as excluded.
{
auto prev_parts = parts;
parts.clear();
Context & query_context
= context.hasQueryContext() ? const_cast<Context &>(context).getQueryContext() : const_cast<Context &>(context);
for (const auto & part : prev_parts)
/// process_parts prepare parts that have to be read for the query,
/// returns false if duplicated parts' UUID have been met
auto select_parts = [&] (MergeTreeData::DataPartsVector & selected_parts) -> bool
{
if (part_values.find(part->name) == part_values.end())
continue;
auto ignored_part_uuids = query_context.getIgnoredPartUUIDs();
std::unordered_set<UUID> temp_part_uuids;
if (part->isEmpty())
continue;
auto prev_parts = selected_parts;
selected_parts.clear();
if (minmax_idx_condition && !minmax_idx_condition->checkInHyperrectangle(
part->minmax_idx.hyperrectangle, data.minmax_idx_column_types).can_be_true)
continue;
if (partition_pruner)
for (const auto & part : prev_parts)
{
if (partition_pruner->canBePruned(part))
if (part_values.find(part->name) == part_values.end())
continue;
if (part->isEmpty())
continue;
if (minmax_idx_condition
&& !minmax_idx_condition->checkInHyperrectangle(part->minmax_idx.hyperrectangle, data.minmax_idx_column_types)
.can_be_true)
continue;
if (partition_pruner)
{
if (partition_pruner->canBePruned(part))
continue;
}
if (max_block_numbers_to_read)
{
auto blocks_iterator = max_block_numbers_to_read->find(part->info.partition_id);
if (blocks_iterator == max_block_numbers_to_read->end() || part->info.max_block > blocks_iterator->second)
continue;
}
/// populate UUIDs and exclude ignored parts if enabled
if (query_context.getSettingsRef().allow_experimental_query_deduplication && part->uuid != UUIDHelpers::Nil)
{
/// Skip the part if its uuid is meant to be excluded
if (ignored_part_uuids->has(part->uuid))
continue;
auto result = temp_part_uuids.insert(part->uuid);
if (!result.second)
throw Exception("Found a part with the same UUID on the same replica.", ErrorCodes::LOGICAL_ERROR);
}
selected_parts.push_back(part);
}
if (max_block_numbers_to_read)
if (!temp_part_uuids.empty())
{
auto blocks_iterator = max_block_numbers_to_read->find(part->info.partition_id);
if (blocks_iterator == max_block_numbers_to_read->end() || part->info.max_block > blocks_iterator->second)
continue;
auto duplicates = query_context.getPartUUIDs()->add(std::vector<UUID>{temp_part_uuids.begin(), temp_part_uuids.end()});
if (!duplicates.empty())
{
/// on a local replica with prefer_localhost_replica=1 if any duplicates appeared during the first pass,
/// adding them to the exclusion, so they will be skipped on second pass
query_context.getIgnoredPartUUIDs()->add(duplicates);
return false;
}
}
parts.push_back(part);
return true;
};
/// Process parts that have to be read for a query.
auto needs_retry = !select_parts(parts);
/// If any duplicated part UUIDs met during the first step, try to ignore them in second pass
if (needs_retry)
{
if (log)
LOG_DEBUG(log, "Found duplicate uuids locally, will retry part selection without them");
/// Second attempt didn't help, throw an exception
if (!select_parts(parts))
throw Exception("Found duplicate UUIDs while processing query.", ErrorCodes::DUPLICATED_PART_UUIDS);
}
}

View File

@ -4,6 +4,7 @@
#include <Disks/IDisk.h>
#include <DataTypes/DataTypeFactory.h>
#include <DataTypes/DataTypeUUID.h>
#include <DataTypes/DataTypesNumber.h>
#include <Storages/Distributed/DistributedBlockOutputStream.h>
@ -345,6 +346,7 @@ NamesAndTypesList StorageDistributed::getVirtuals() const
NameAndTypePair("_table", std::make_shared<DataTypeString>()),
NameAndTypePair("_part", std::make_shared<DataTypeString>()),
NameAndTypePair("_part_index", std::make_shared<DataTypeUInt64>()),
NameAndTypePair("_part_uuid", std::make_shared<DataTypeUUID>()),
NameAndTypePair("_partition_id", std::make_shared<DataTypeString>()),
NameAndTypePair("_sample_factor", std::make_shared<DataTypeFloat64>()),
NameAndTypePair("_shard_num", std::make_shared<DataTypeUInt32>()),

View File

@ -48,6 +48,7 @@ SRCS(
MergeTree/MergeTreeDataPartInMemory.cpp
MergeTree/MergeTreeDataPartTTLInfo.cpp
MergeTree/MergeTreeDataPartType.cpp
MergeTree/MergeTreeDataPartUUID.cpp
MergeTree/MergeTreeDataPartWide.cpp
MergeTree/MergeTreeDataPartWriterCompact.cpp
MergeTree/MergeTreeDataPartWriterInMemory.cpp

View File

@ -0,0 +1,5 @@
<yandex>
<merge_tree>
<assign_part_uuids>1</assign_part_uuids>
</merge_tree>
</yandex>

View File

@ -0,0 +1,24 @@
<yandex>
<remote_servers>
<test_cluster>
<shard>
<replica>
<host>node1</host>
<port>9000</port>
</replica>
</shard>
<shard>
<replica>
<host>node2</host>
<port>9000</port>
</replica>
</shard>
<shard>
<replica>
<host>node3</host>
<port>9000</port>
</replica>
</shard>
</test_cluster>
</remote_servers>
</yandex>

View File

@ -0,0 +1,165 @@
import uuid
import pytest
from helpers.cluster import ClickHouseCluster
from helpers.test_tools import TSV
DUPLICATED_UUID = uuid.uuid4()
cluster = ClickHouseCluster(__file__)
node1 = cluster.add_instance(
'node1',
main_configs=['configs/remote_servers.xml', 'configs/deduplication_settings.xml'])
node2 = cluster.add_instance(
'node2',
main_configs=['configs/remote_servers.xml', 'configs/deduplication_settings.xml'])
node3 = cluster.add_instance(
'node3',
main_configs=['configs/remote_servers.xml', 'configs/deduplication_settings.xml'])
@pytest.fixture(scope="module")
def started_cluster():
try:
cluster.start()
yield cluster
finally:
cluster.shutdown()
def prepare_node(node, parts_uuid=None):
node.query("""
CREATE TABLE t(_prefix UInt8 DEFAULT 0, key UInt64, value UInt64)
ENGINE MergeTree()
ORDER BY tuple()
PARTITION BY _prefix
SETTINGS index_granularity = 1
""")
node.query("""
CREATE TABLE d AS t ENGINE=Distributed(test_cluster, default, t)
""")
# Stop merges while populating test data
node.query("SYSTEM STOP MERGES")
# Create 5 parts
for i in range(1, 6):
node.query("INSERT INTO t VALUES ({}, {}, {})".format(i, i, i))
node.query("DETACH TABLE t")
if parts_uuid:
for part, part_uuid in parts_uuid:
script = """
echo -n '{}' > /var/lib/clickhouse/data/default/t/{}/uuid.txt
""".format(part_uuid, part)
node.exec_in_container(["bash", "-c", script])
# Attach table back
node.query("ATTACH TABLE t")
# NOTE:
# due to absence of the ability to lock part, need to operate on parts with preventin merges
# node.query("SYSTEM START MERGES")
# node.query("OPTIMIZE TABLE t FINAL")
print(node.name)
print(node.query("SELECT name, uuid, partition FROM system.parts WHERE table = 't' AND active ORDER BY name"))
assert '5' == node.query("SELECT count() FROM system.parts WHERE table = 't' AND active").strip()
if parts_uuid:
for part, part_uuid in parts_uuid:
assert '1' == node.query(
"SELECT count() FROM system.parts WHERE table = 't' AND uuid = '{}' AND active".format(
part_uuid)).strip()
@pytest.fixture(scope="module")
def prepared_cluster(started_cluster):
print("duplicated UUID: {}".format(DUPLICATED_UUID))
prepare_node(node1, parts_uuid=[("3_3_3_0", DUPLICATED_UUID)])
prepare_node(node2, parts_uuid=[("3_3_3_0", DUPLICATED_UUID)])
prepare_node(node3)
def test_virtual_column(prepared_cluster):
# Part containing `key=3` has the same fingerprint on both nodes,
# we expect it to be included only once in the end result.;
# select query is using virtucal column _part_fingerprint to filter out part in one shard
expected = """
1 2
2 2
3 1
4 2
5 2
"""
assert TSV(expected) == TSV(node1.query("""
SELECT
key,
count() AS c
FROM d
WHERE ((_shard_num = 1) AND (_part_uuid != '{}')) OR (_shard_num = 2)
GROUP BY key
ORDER BY
key ASC
""".format(DUPLICATED_UUID)))
def test_with_deduplication(prepared_cluster):
# Part containing `key=3` has the same fingerprint on both nodes,
# we expect it to be included only once in the end result
expected = """
1 3
2 3
3 2
4 3
5 3
"""
assert TSV(expected) == TSV(node1.query(
"SET allow_experimental_query_deduplication=1; SELECT key, count() c FROM d GROUP BY key ORDER BY key"))
def test_no_merge_with_deduplication(prepared_cluster):
# Part containing `key=3` has the same fingerprint on both nodes,
# we expect it to be included only once in the end result.
# even with distributed_group_by_no_merge=1 the duplicated part should be excluded from the final result
expected = """
1 1
2 1
3 1
4 1
5 1
1 1
2 1
3 1
4 1
5 1
1 1
2 1
4 1
5 1
"""
assert TSV(expected) == TSV(node1.query("SELECT key, count() c FROM d GROUP BY key ORDER BY key", settings={
"allow_experimental_query_deduplication": 1,
"distributed_group_by_no_merge": 1,
}))
def test_without_deduplication(prepared_cluster):
# Part containing `key=3` has the same fingerprint on both nodes,
# but allow_experimental_query_deduplication is disabled,
# so it will not be excluded
expected = """
1 3
2 3
3 3
4 3
5 3
"""
assert TSV(expected) == TSV(node1.query(
"SET allow_experimental_query_deduplication=0; SELECT key, count() c FROM d GROUP BY key ORDER BY key"))