INSERT now clamps settings from initiator to shard's constraints.

This commit is contained in:
Vitaly Baranov 2020-03-25 01:26:24 +03:00
parent 25f08e83c7
commit 34984d4a9a
6 changed files with 34 additions and 17 deletions

View File

@ -21,13 +21,14 @@ namespace ErrorCodes
RemoteBlockOutputStream::RemoteBlockOutputStream(Connection & connection_,
const ConnectionTimeouts & timeouts,
const String & query_,
const Settings * settings_)
: connection(connection_), query(query_), settings(settings_)
const Settings * settings_,
const ClientInfo * client_info_)
: connection(connection_), query(query_), settings(settings_), client_info(client_info_)
{
/** Send query and receive "header", that describe table structure.
* Header is needed to know, what structure is required for blocks to be passed to 'write' method.
*/
connection.sendQuery(timeouts, query, "", QueryProcessingStage::Complete, settings, nullptr);
connection.sendQuery(timeouts, query, "", QueryProcessingStage::Complete, settings, client_info);
while (true)
{

View File

@ -22,7 +22,8 @@ public:
RemoteBlockOutputStream(Connection & connection_,
const ConnectionTimeouts & timeouts,
const String & query_,
const Settings * settings_ = nullptr);
const Settings * settings_ = nullptr,
const ClientInfo * client_info_ = nullptr);
Block getHeader() const override { return header; }
@ -38,6 +39,7 @@ private:
Connection & connection;
String query;
const Settings * settings;
const ClientInfo * client_info;
Block header;
bool finished = false;
};

View File

@ -77,6 +77,10 @@ public:
bool empty() const { return query_kind == QueryKind::NO_QUERY; }
auto toTuple() const { return std::tie(current_query_id, initial_query_id, current_address, initial_address, query_kind, current_user, initial_user, current_password, interface, os_user, client_hostname, client_name, client_version_major, client_version_minor, client_version_patch, client_revision, http_method, http_user_agent, quota_key); }
friend bool operator==(const ClientInfo & lhs, const ClientInfo & rhs) { return lhs.toTuple() == rhs.toTuple(); }
friend bool operator!=(const ClientInfo & lhs, const ClientInfo & rhs) { return !(lhs == rhs); }
/** Serialization and deserialization.
* Only values that are not calculated automatically or passed separately are serialized.
* Revisions are passed to use format that server will understand or client was used.

View File

@ -279,9 +279,10 @@ void StorageDistributedDirectoryMonitor::processFile(const std::string & file_pa
Settings insert_settings;
std::string insert_query;
readHeader(in, insert_settings, insert_query, log);
ClientInfo client_info;
readHeader(in, insert_settings, insert_query, client_info, log);
RemoteBlockOutputStream remote{*connection, timeouts, insert_query, &insert_settings};
RemoteBlockOutputStream remote{*connection, timeouts, insert_query, &insert_settings, &client_info};
remote.writePrefix();
remote.writePrepared(in);
@ -299,7 +300,7 @@ void StorageDistributedDirectoryMonitor::processFile(const std::string & file_pa
}
void StorageDistributedDirectoryMonitor::readHeader(
ReadBuffer & in, Settings & insert_settings, std::string & insert_query, Logger * log)
ReadBuffer & in, Settings & insert_settings, std::string & insert_query, ClientInfo & client_info, Logger * log)
{
UInt64 query_size;
readVarUInt(query_size, in);
@ -331,8 +332,11 @@ void StorageDistributedDirectoryMonitor::readHeader(
readStringBinary(insert_query, header_buf);
insert_settings.deserialize(header_buf);
if (header_buf.hasPendingData())
client_info.read(header_buf, initiator_revision);
/// Add handling new data here, for example:
/// if (initiator_revision >= DBMS_MIN_REVISION_WITH_MY_NEW_DATA)
/// if (header_buf.hasPendingData())
/// readVarUInt(my_new_data, header_buf);
return;
@ -353,18 +357,20 @@ struct StorageDistributedDirectoryMonitor::BatchHeader
{
Settings settings;
String query;
ClientInfo client_info;
Block sample_block;
BatchHeader(Settings settings_, String query_, Block sample_block_)
BatchHeader(Settings settings_, String query_, ClientInfo client_info_, Block sample_block_)
: settings(std::move(settings_))
, query(std::move(query_))
, client_info(std::move(client_info_))
, sample_block(std::move(sample_block_))
{
}
bool operator==(const BatchHeader & other) const
{
return settings == other.settings && query == other.query &&
return settings == other.settings && query == other.query && client_info == other.client_info &&
blocksHaveEqualStructure(sample_block, other.sample_block);
}
@ -445,6 +451,7 @@ struct StorageDistributedDirectoryMonitor::Batch
{
Settings insert_settings;
String insert_query;
ClientInfo client_info;
std::unique_ptr<RemoteBlockOutputStream> remote;
bool first = true;
@ -459,12 +466,12 @@ struct StorageDistributedDirectoryMonitor::Batch
}
ReadBufferFromFile in(file_path->second);
parent.readHeader(in, insert_settings, insert_query, parent.log);
parent.readHeader(in, insert_settings, insert_query, client_info, parent.log);
if (first)
{
first = false;
remote = std::make_unique<RemoteBlockOutputStream>(*connection, timeouts, insert_query, &insert_settings);
remote = std::make_unique<RemoteBlockOutputStream>(*connection, timeouts, insert_query, &insert_settings, &client_info);
remote->writePrefix();
}
@ -541,7 +548,8 @@ public:
{
Settings insert_settings;
String insert_query;
StorageDistributedDirectoryMonitor::readHeader(in, insert_settings, insert_query, log);
ClientInfo client_info;
StorageDistributedDirectoryMonitor::readHeader(in, insert_settings, insert_query, client_info, log);
block_in.readPrefix();
first_block = block_in.read();
@ -610,11 +618,12 @@ void StorageDistributedDirectoryMonitor::processFilesWithBatching(const std::map
Block sample_block;
Settings insert_settings;
String insert_query;
ClientInfo client_info;
try
{
/// Determine metadata of the current file and check if it is not broken.
ReadBufferFromFile in{file_path};
readHeader(in, insert_settings, insert_query, log);
readHeader(in, insert_settings, insert_query, client_info, log);
CompressedReadBuffer decompressing_in(in);
NativeBlockInputStream block_in(decompressing_in, ClickHouseRevision::get());
@ -641,7 +650,7 @@ void StorageDistributedDirectoryMonitor::processFilesWithBatching(const std::map
throw;
}
BatchHeader batch_header(std::move(insert_settings), std::move(insert_query), std::move(sample_block));
BatchHeader batch_header(std::move(insert_settings), std::move(insert_query), std::move(client_info), std::move(sample_block));
Batch & batch = header_to_batch.try_emplace(batch_header, *this, files).first->second;
batch.file_indices.push_back(file_idx);

View File

@ -70,7 +70,7 @@ private:
ThreadFromGlobalPool thread{&StorageDistributedDirectoryMonitor::run, this};
/// Read insert query and insert settings for backward compatible.
static void readHeader(ReadBuffer & in, Settings & insert_settings, std::string & insert_query, Logger * log);
static void readHeader(ReadBuffer & in, Settings & insert_settings, std::string & insert_query, ClientInfo & client_info, Logger * log);
friend class DirectoryMonitorBlockInputStream;
};

View File

@ -290,7 +290,7 @@ ThreadPool::Job DistributedBlockOutputStream::runWritingJob(DistributedBlockOutp
if (throttler)
job.connection_entry->setThrottler(throttler);
job.stream = std::make_shared<RemoteBlockOutputStream>(*job.connection_entry, timeouts, query_string, &settings);
job.stream = std::make_shared<RemoteBlockOutputStream>(*job.connection_entry, timeouts, query_string, &settings, &context.getClientInfo());
job.stream->writePrefix();
}
@ -598,6 +598,7 @@ void DistributedBlockOutputStream::writeToShard(const Block & block, const std::
writeVarUInt(ClickHouseRevision::get(), header_buf);
writeStringBinary(query_string, header_buf);
context.getSettingsRef().serialize(header_buf);
context.getClientInfo().write(header_buf, ClickHouseRevision::get());
/// Add new fields here, for example:
/// writeVarUInt(my_new_data, header_buf);