Merge branch 'master' into reenable_testflows_rbac

This commit is contained in:
mergify[bot] 2021-07-24 15:58:47 +00:00 committed by GitHub
commit b39b130b97
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
50 changed files with 703 additions and 845 deletions

View File

@ -13,6 +13,3 @@ ClickHouse® is an open-source column-oriented database management system that a
* [Code Browser](https://clickhouse.tech/codebrowser/html_report/ClickHouse/index.html) with syntax highlight and navigation.
* [Contacts](https://clickhouse.tech/#contacts) can help to get your questions answered if there are any.
* You can also [fill this form](https://clickhouse.tech/#meet) to meet Yandex ClickHouse team in person.
## Upcoming Events
* [ClickHouse Meetup by ByteDance (online)](https://www.meetup.com/ByteDanceDev-group/events/279543467/) on 23 July 2021.

View File

@ -164,6 +164,10 @@ fi
# if no args passed to `docker run` or first argument start with `--`, then the user is passing clickhouse-server arguments
if [[ $# -lt 1 ]] || [[ "$1" == "--"* ]]; then
# Watchdog is launched by default, but does not send SIGINT to the main process,
# so the container can't be finished by ctrl+c
CLICKHOUSE_WATCHDOG_ENABLE=${CLICKHOUSE_WATCHDOG_ENABLE:-0}
export CLICKHOUSE_WATCHDOG_ENABLE
exec $gosu /usr/bin/clickhouse-server --config-file="$CLICKHOUSE_CONFIG" "$@"
fi

View File

@ -5,6 +5,8 @@ toc_title: MaterializeMySQL
# MaterializeMySQL {#materialize-mysql}
**This is experimental feature that should not be used in production.**
Creates ClickHouse database with all the tables existing in MySQL, and all the data in those tables.
ClickHouse server works as MySQL replica. It reads binlog and performs DDL and DML queries.

View File

@ -271,7 +271,8 @@ private:
if (max_time > 0 && total_watch.elapsedSeconds() >= max_time)
{
std::cout << "Stopping launch of queries. Requested time limit is exhausted.\n";
std::cout << "Stopping launch of queries."
<< " Requested time limit " << max_time << " seconds is exhausted.\n";
return false;
}
@ -368,8 +369,7 @@ private:
{
extracted = queue.tryPop(query, 100);
if (shutdown
|| (max_iterations && queries_executed == max_iterations))
if (shutdown || (max_iterations && queries_executed == max_iterations))
{
return;
}
@ -382,8 +382,9 @@ private:
}
catch (...)
{
std::cerr << "An error occurred while processing the query '"
<< query << "'.\n";
std::lock_guard lock(mutex);
std::cerr << "An error occurred while processing the query " << "'" << query << "'"
<< ": " << getCurrentExceptionMessage(false) << std::endl;
if (!continue_on_errors)
{
shutdown = true;

View File

@ -15,8 +15,8 @@
#include <DataTypes/DataTypeFactory.h>
#include <DataTypes/DataTypeUUID.h>
#include <Interpreters/Context.h>
#include <DataStreams/IBlockOutputStream.h>
#include <DataStreams/LimitBlockInputStream.h>
#include <Processors/Pipe.h>
#include <Processors/LimitTransform.h>
#include <Common/SipHash.h>
#include <Common/UTF8Helpers.h>
#include <Common/StringUtils/StringUtils.h>
@ -24,6 +24,10 @@
#include <Common/typeid_cast.h>
#include <Common/assert_cast.h>
#include <Formats/registerFormats.h>
#include <Formats/FormatFactory.h>
#include <Processors/Formats/IInputFormat.h>
#include <Processors/QueryPipeline.h>
#include <Processors/Executors/PullingPipelineExecutor.h>
#include <Core/Block.h>
#include <common/StringRef.h>
#include <common/DateLUT.h>
@ -1156,17 +1160,20 @@ try
if (!silent)
std::cerr << "Training models\n";
BlockInputStreamPtr input = context->getInputFormat(input_format, file_in, header, max_block_size);
Pipe pipe(FormatFactory::instance().getInput(input_format, file_in, header, context, max_block_size));
input->readPrefix();
while (Block block = input->read())
QueryPipeline pipeline;
pipeline.init(std::move(pipe));
PullingPipelineExecutor executor(pipeline);
Block block;
while (executor.pull(block))
{
obfuscator.train(block.getColumns());
source_rows += block.rows();
if (!silent)
std::cerr << "Processed " << source_rows << " rows\n";
}
input->readSuffix();
}
obfuscator.finalize();
@ -1183,15 +1190,26 @@ try
file_in.seek(0, SEEK_SET);
BlockInputStreamPtr input = context->getInputFormat(input_format, file_in, header, max_block_size);
BlockOutputStreamPtr output = context->getOutputStreamParallelIfPossible(output_format, file_out, header);
Pipe pipe(FormatFactory::instance().getInput(input_format, file_in, header, context, max_block_size));
if (processed_rows + source_rows > limit)
input = std::make_shared<LimitBlockInputStream>(input, limit - processed_rows, 0);
{
pipe.addSimpleTransform([&](const Block & cur_header)
{
return std::make_shared<LimitTransform>(cur_header, limit - processed_rows, 0);
});
}
QueryPipeline pipeline;
pipeline.init(std::move(pipe));
BlockOutputStreamPtr output = context->getOutputStreamParallelIfPossible(output_format, file_out, header);
PullingPipelineExecutor executor(pipeline);
input->readPrefix();
output->writePrefix();
while (Block block = input->read())
Block block;
while (executor.pull(block))
{
Columns columns = obfuscator.generate(block.getColumns());
output->write(header.cloneWithColumns(columns));
@ -1200,7 +1218,6 @@ try
std::cerr << "Processed " << processed_rows << " rows\n";
}
output->writeSuffix();
input->readSuffix();
obfuscator.updateSeed();
}

View File

@ -79,7 +79,7 @@ std::vector<UUID> GrantedRoles::findGranted(const boost::container::flat_set<UUI
{
std::vector<UUID> res;
res.reserve(ids.size());
boost::range::set_difference(ids, roles, std::back_inserter(res));
boost::range::set_intersection(ids, roles, std::back_inserter(res));
return res;
}
@ -110,7 +110,7 @@ std::vector<UUID> GrantedRoles::findGrantedWithAdminOption(const boost::containe
{
std::vector<UUID> res;
res.reserve(ids.size());
boost::range::set_difference(ids, roles_with_admin_option, std::back_inserter(res));
boost::range::set_intersection(ids, roles_with_admin_option, std::back_inserter(res));
return res;
}

View File

@ -32,7 +32,13 @@ SettingsProfilesInfo::getConstraintsAndProfileIDs(const std::shared_ptr<const Se
auto res = std::make_shared<SettingsConstraintsAndProfileIDs>(manager);
res->current_profiles = profiles;
res->constraints = previous ? previous->constraints : constraints;
if (previous)
{
res->constraints = previous->constraints;
res->constraints.merge(constraints);
}
else
res->constraints = constraints;
if (previous)
{

View File

@ -29,6 +29,20 @@ MultiplexedConnections::MultiplexedConnections(Connection & connection, const Se
active_connection_count = 1;
}
MultiplexedConnections::MultiplexedConnections(std::shared_ptr<Connection> connection_ptr_, const Settings & settings_, const ThrottlerPtr & throttler)
: settings(settings_), drain_timeout(settings.drain_timeout), receive_timeout(settings.receive_timeout)
, connection_ptr(connection_ptr_)
{
connection_ptr->setThrottler(throttler);
ReplicaState replica_state;
replica_state.connection = connection_ptr.get();
replica_states.push_back(replica_state);
active_connection_count = 1;
}
MultiplexedConnections::MultiplexedConnections(
std::vector<IConnectionPool::Entry> && connections, const Settings & settings_, const ThrottlerPtr & throttler)
: settings(settings_), drain_timeout(settings.drain_timeout), receive_timeout(settings.receive_timeout)

View File

@ -22,6 +22,8 @@ class MultiplexedConnections final : public IConnections
public:
/// Accepts ready connection.
MultiplexedConnections(Connection & connection, const Settings & settings_, const ThrottlerPtr & throttler_);
/// Accepts ready connection and keep it alive before drain
MultiplexedConnections(std::shared_ptr<Connection> connection_, const Settings & settings_, const ThrottlerPtr & throttler_);
/// Accepts a vector of connections to replicas of one shard already taken from pool.
MultiplexedConnections(
@ -79,7 +81,6 @@ private:
/// Mark the replica as invalid.
void invalidateReplica(ReplicaState & replica_state);
private:
const Settings & settings;
/// The following two fields are from settings but can be referenced outside the lifetime of
@ -95,6 +96,8 @@ private:
/// Connection that received last block.
Connection * current_connection = nullptr;
/// Shared connection, may be empty. Used to keep object alive before draining.
std::shared_ptr<Connection> connection_ptr;
bool sent_query = false;
bool cancelled = false;

View File

@ -1,44 +0,0 @@
#pragma once
#include <DataStreams/IBlockInputStream.h>
namespace DB
{
/** A stream of blocks from which you can read the next block from an explicitly provided list.
* Also see OneBlockInputStream.
*/
class BlocksListBlockInputStream : public IBlockInputStream
{
public:
/// Acquires the ownership of the block list.
BlocksListBlockInputStream(BlocksList && list_)
: list(std::move(list_)), it(list.begin()), end(list.end()) {}
/// Uses a list of blocks lying somewhere else.
BlocksListBlockInputStream(BlocksList::iterator & begin_, BlocksList::iterator & end_)
: it(begin_), end(end_) {}
String getName() const override { return "BlocksList"; }
protected:
Block getHeader() const override { return list.empty() ? Block() : *list.begin(); }
Block readImpl() override
{
if (it == end)
return Block();
Block res = *it;
++it;
return res;
}
private:
BlocksList list;
BlocksList::iterator it;
const BlocksList::iterator end;
};
}

View File

@ -1,158 +0,0 @@
#include <algorithm>
#include <DataStreams/LimitBlockInputStream.h>
namespace DB
{
/// gets pointers to all columns of block, which were used for ORDER BY
static ColumnRawPtrs extractSortColumns(const Block & block, const SortDescription & description)
{
size_t size = description.size();
ColumnRawPtrs res;
res.reserve(size);
for (size_t i = 0; i < size; ++i)
{
const IColumn * column = !description[i].column_name.empty()
? block.getByName(description[i].column_name).column.get()
: block.safeGetByPosition(description[i].column_number).column.get();
res.emplace_back(column);
}
return res;
}
LimitBlockInputStream::LimitBlockInputStream(
const BlockInputStreamPtr & input, UInt64 limit_, UInt64 offset_, bool always_read_till_end_,
bool use_limit_as_total_rows_approx, bool with_ties_, const SortDescription & description_)
: limit(limit_), offset(offset_), always_read_till_end(always_read_till_end_), with_ties(with_ties_)
, description(description_)
{
if (use_limit_as_total_rows_approx)
{
addTotalRowsApprox(static_cast<size_t>(limit));
}
children.push_back(input);
}
Block LimitBlockInputStream::readImpl()
{
Block res;
UInt64 rows = 0;
/// pos >= offset + limit and all rows in the end of previous block were equal
/// to row at 'limit' position. So we check current block.
if (!ties_row_ref.empty() && pos >= offset + limit)
{
res = children.back()->read();
rows = res.rows();
if (!res)
return res;
SharedBlockPtr ptr = new detail::SharedBlock(std::move(res));
ptr->sort_columns = extractSortColumns(*ptr, description);
UInt64 len;
for (len = 0; len < rows; ++len)
{
SharedBlockRowRef current_row;
current_row.set(ptr, &ptr->sort_columns, len);
if (current_row != ties_row_ref)
{
ties_row_ref.reset();
break;
}
}
if (len < rows)
{
for (size_t i = 0; i < ptr->columns(); ++i)
ptr->safeGetByPosition(i).column = ptr->safeGetByPosition(i).column->cut(0, len);
}
return *ptr;
}
if (pos >= offset + limit)
{
if (!always_read_till_end)
return res;
else
{
while (children.back()->read())
;
return res;
}
}
do
{
res = children.back()->read();
if (!res)
return res;
rows = res.rows();
pos += rows;
} while (pos <= offset);
SharedBlockPtr ptr = new detail::SharedBlock(std::move(res));
if (with_ties)
ptr->sort_columns = extractSortColumns(*ptr, description);
/// give away the whole block
if (pos >= offset + rows && pos <= offset + limit)
{
/// Save rowref for last row, because probalbly next block begins with the same row.
if (with_ties && pos == offset + limit)
ties_row_ref.set(ptr, &ptr->sort_columns, rows - 1);
return *ptr;
}
/// give away a piece of the block
UInt64 start = std::max(
static_cast<Int64>(0),
static_cast<Int64>(offset) - static_cast<Int64>(pos) + static_cast<Int64>(rows));
UInt64 length = std::min(
static_cast<Int64>(limit), std::min(
static_cast<Int64>(pos) - static_cast<Int64>(offset),
static_cast<Int64>(limit) + static_cast<Int64>(offset) - static_cast<Int64>(pos) + static_cast<Int64>(rows)));
/// check if other rows in current block equals to last one in limit
if (with_ties)
{
ties_row_ref.set(ptr, &ptr->sort_columns, start + length - 1);
for (size_t i = ties_row_ref.row_num + 1; i < rows; ++i)
{
SharedBlockRowRef current_row;
current_row.set(ptr, &ptr->sort_columns, i);
if (current_row == ties_row_ref)
++length;
else
{
ties_row_ref.reset();
break;
}
}
}
if (length == rows)
return *ptr;
for (size_t i = 0; i < ptr->columns(); ++i)
ptr->safeGetByPosition(i).column = ptr->safeGetByPosition(i).column->cut(start, length);
// TODO: we should provide feedback to child-block, so it will know how many rows are actually consumed.
// It's crucial for streaming engines like Kafka.
return *ptr;
}
}

View File

@ -1,47 +0,0 @@
#pragma once
#include <DataStreams/IBlockInputStream.h>
#include <Common/SharedBlockRowRef.h>
#include <Core/SortDescription.h>
namespace DB
{
/** Implements the LIMIT relational operation.
*/
class LimitBlockInputStream : public IBlockInputStream
{
public:
/** If always_read_till_end = false (by default), then after reading enough data,
* returns an empty block, and this causes the query to be canceled.
* If always_read_till_end = true - reads all the data to the end, but ignores them. This is necessary in rare cases:
* when otherwise, due to the cancellation of the request, we would not have received the data for GROUP BY WITH TOTALS from the remote server.
* If use_limit_as_total_rows_approx = true, then addTotalRowsApprox is called to use the limit in progress & stats
* with_ties = true, when query has WITH TIES modifier. If so, description should be provided
* description lets us know which row we should check for equality
*/
LimitBlockInputStream(
const BlockInputStreamPtr & input, UInt64 limit_, UInt64 offset_,
bool always_read_till_end_ = false, bool use_limit_as_total_rows_approx = false,
bool with_ties_ = false, const SortDescription & description_ = {});
String getName() const override { return "Limit"; }
Block getHeader() const override { return children.at(0)->getHeader(); }
protected:
Block readImpl() override;
private:
UInt64 limit;
UInt64 offset;
UInt64 pos = 0;
bool always_read_till_end;
bool with_ties;
const SortDescription description;
SharedBlockRowRef ties_row_ref;
};
}

View File

@ -1,273 +0,0 @@
#include <queue>
#include <common/logger_useful.h>
#include <DataStreams/MergingSortedBlockInputStream.h>
#include <DataStreams/ColumnGathererStream.h>
namespace DB
{
namespace ErrorCodes
{
extern const int LOGICAL_ERROR;
}
MergingSortedBlockInputStream::MergingSortedBlockInputStream(
const BlockInputStreams & inputs_, SortDescription description_,
size_t max_block_size_, UInt64 limit_, WriteBuffer * out_row_sources_buf_, bool quiet_)
: description(std::move(description_)), max_block_size(max_block_size_), limit(limit_), quiet(quiet_)
, source_blocks(inputs_.size())
, cursors(inputs_.size()), out_row_sources_buf(out_row_sources_buf_)
, log(&Poco::Logger::get("MergingSortedBlockInputStream"))
{
children.insert(children.end(), inputs_.begin(), inputs_.end());
header = children.at(0)->getHeader();
num_columns = header.columns();
}
void MergingSortedBlockInputStream::init(MutableColumns & merged_columns)
{
/// Read the first blocks, initialize the queue.
if (first)
{
first = false;
for (size_t i = 0; i < source_blocks.size(); ++i)
{
Block & block = source_blocks[i];
if (block)
continue;
block = children[i]->read();
const size_t rows = block.rows();
if (rows == 0)
continue;
if (expected_block_size < rows)
expected_block_size = std::min(rows, max_block_size);
cursors[i] = SortCursorImpl(block, description, i);
has_collation |= cursors[i].has_collation;
}
if (has_collation)
queue_with_collation = SortingHeap<SortCursorWithCollation>(cursors);
else
queue_without_collation = SortingHeap<SortCursor>(cursors);
}
/// Let's check that all source blocks have the same structure.
for (const auto & block : source_blocks)
{
if (!block)
continue;
assertBlocksHaveEqualStructure(block, header, getName());
}
merged_columns.resize(num_columns);
for (size_t i = 0; i < num_columns; ++i)
{
merged_columns[i] = header.safeGetByPosition(i).column->cloneEmpty();
merged_columns[i]->reserve(expected_block_size);
}
}
Block MergingSortedBlockInputStream::readImpl()
{
if (finished)
return {};
if (children.size() == 1)
return children[0]->read();
MutableColumns merged_columns;
init(merged_columns);
if (merged_columns.empty())
return {};
if (has_collation)
merge(merged_columns, queue_with_collation);
else
merge(merged_columns, queue_without_collation);
return header.cloneWithColumns(std::move(merged_columns));
}
template <typename TSortCursor>
void MergingSortedBlockInputStream::fetchNextBlock(const TSortCursor & current, SortingHeap<TSortCursor> & queue)
{
size_t order = current->order;
size_t size = cursors.size();
if (order >= size || &cursors[order] != current.impl)
throw Exception("Logical error in MergingSortedBlockInputStream", ErrorCodes::LOGICAL_ERROR);
while (true)
{
source_blocks[order] = children[order]->read();
if (!source_blocks[order])
{
queue.removeTop();
break;
}
if (source_blocks[order].rows())
{
cursors[order].reset(source_blocks[order]);
queue.replaceTop(&cursors[order]);
break;
}
}
}
template <typename TSortingHeap>
void MergingSortedBlockInputStream::merge(MutableColumns & merged_columns, TSortingHeap & queue)
{
size_t merged_rows = 0;
/** Increase row counters.
* Return true if it's time to finish generating the current data block.
*/
auto count_row_and_check_limit = [&, this]()
{
++total_merged_rows;
if (limit && total_merged_rows == limit)
{
// std::cerr << "Limit reached\n";
cancel(false);
finished = true;
return true;
}
++merged_rows;
return merged_rows >= max_block_size;
};
/// Take rows in required order and put them into `merged_columns`, while the number of rows are no more than `max_block_size`
while (queue.isValid())
{
auto current = queue.current();
/** And what if the block is totally less or equal than the rest for the current cursor?
* Or is there only one data source left in the queue? Then you can take the entire block on current cursor.
*/
if (current->isFirst()
&& (queue.size() == 1
|| (queue.size() >= 2 && current.totallyLessOrEquals(queue.nextChild()))))
{
// std::cerr << "current block is totally less or equals\n";
/// If there are already data in the current block, we first return it. We'll get here again the next time we call the merge function.
if (merged_rows != 0)
{
//std::cerr << "merged rows is non-zero\n";
return;
}
/// Actually, current->order stores source number (i.e. cursors[current->order] == current)
size_t source_num = current->order;
if (source_num >= cursors.size())
throw Exception("Logical error in MergingSortedBlockInputStream", ErrorCodes::LOGICAL_ERROR);
for (size_t i = 0; i < num_columns; ++i)
merged_columns[i] = IColumn::mutate(std::move(source_blocks[source_num].getByPosition(i).column));
// std::cerr << "copied columns\n";
merged_rows = merged_columns.at(0)->size();
/// Limit output
if (limit && total_merged_rows + merged_rows > limit)
{
merged_rows = limit - total_merged_rows;
for (size_t i = 0; i < num_columns; ++i)
{
auto & column = merged_columns[i];
column = IColumn::mutate(column->cut(0, merged_rows));
}
cancel(false);
finished = true;
}
/// Write order of rows for other columns
/// this data will be used in grather stream
if (out_row_sources_buf)
{
RowSourcePart row_source(source_num);
for (size_t i = 0; i < merged_rows; ++i)
out_row_sources_buf->write(row_source.data);
}
//std::cerr << "fetching next block\n";
total_merged_rows += merged_rows;
fetchNextBlock(current, queue);
return;
}
// std::cerr << "total_merged_rows: " << total_merged_rows << ", merged_rows: " << merged_rows << "\n";
// std::cerr << "Inserting row\n";
for (size_t i = 0; i < num_columns; ++i)
merged_columns[i]->insertFrom(*current->all_columns[i], current->getRow());
if (out_row_sources_buf)
{
/// Actually, current.impl->order stores source number (i.e. cursors[current.impl->order] == current.impl)
RowSourcePart row_source(current->order);
out_row_sources_buf->write(row_source.data);
}
if (!current->isLast())
{
// std::cerr << "moving to next row\n";
queue.next();
}
else
{
/// We get the next block from the corresponding source, if there is one.
// std::cerr << "It was last row, fetching next block\n";
fetchNextBlock(current, queue);
}
if (count_row_and_check_limit())
return;
}
/// We have read all data. Ask children to cancel providing more data.
cancel(false);
finished = true;
}
void MergingSortedBlockInputStream::readSuffixImpl()
{
if (quiet)
return;
const BlockStreamProfileInfo & profile_info = getProfileInfo();
double seconds = profile_info.total_stopwatch.elapsedSeconds();
if (!seconds)
LOG_DEBUG(log, "Merge sorted {} blocks, {} rows in 0 sec.", profile_info.blocks, profile_info.rows);
else
LOG_DEBUG(log, "Merge sorted {} blocks, {} rows in {} sec., {} rows/sec., {}/sec",
profile_info.blocks, profile_info.rows, seconds,
profile_info.rows / seconds,
ReadableSize(profile_info.bytes / seconds));
}
}

View File

@ -1,87 +0,0 @@
#pragma once
#include <Core/SortDescription.h>
#include <Core/SortCursor.h>
#include <IO/WriteHelpers.h>
#include <DataStreams/IBlockInputStream.h>
namespace Poco { class Logger; }
namespace DB
{
/** Merges several sorted streams into one sorted stream.
*/
class MergingSortedBlockInputStream : public IBlockInputStream
{
public:
/** limit - if isn't 0, then we can produce only first limit rows in sorted order.
* out_row_sources - if isn't nullptr, then at the end of execution it should contain part numbers of each read row (and needed flag)
* quiet - don't log profiling info
*/
MergingSortedBlockInputStream(
const BlockInputStreams & inputs_, SortDescription description_, size_t max_block_size_,
UInt64 limit_ = 0, WriteBuffer * out_row_sources_buf_ = nullptr, bool quiet_ = false);
String getName() const override { return "MergingSorted"; }
Block getHeader() const override { return header; }
protected:
Block readImpl() override;
void readSuffixImpl() override;
/// Initializes the queue and the columns of next result block.
void init(MutableColumns & merged_columns);
/// Gets the next block from the source corresponding to the `current`.
template <typename TSortCursor>
void fetchNextBlock(const TSortCursor & current, SortingHeap<TSortCursor> & queue);
Block header;
const SortDescription description;
const size_t max_block_size;
UInt64 limit;
UInt64 total_merged_rows = 0;
bool first = true;
bool has_collation = false;
bool quiet = false;
/// May be smaller or equal to max_block_size. To do 'reserve' for columns.
size_t expected_block_size = 0;
/// Blocks currently being merged.
size_t num_columns = 0;
Blocks source_blocks;
SortCursorImpls cursors;
SortingHeap<SortCursor> queue_without_collation;
SortingHeap<SortCursorWithCollation> queue_with_collation;
/// Used in Vertical merge algorithm to gather non-PK/non-index columns (on next step)
/// If it is not nullptr then it should be populated during execution
WriteBuffer * out_row_sources_buf;
private:
/** We support two different cursors - with Collation and without.
* Templates are used instead of polymorphic SortCursor and calls to virtual functions.
*/
template <typename TSortingHeap>
void merge(MutableColumns & merged_columns, TSortingHeap & queue);
Poco::Logger * log;
/// Read is finished.
bool finished = false;
};
}

View File

@ -34,13 +34,20 @@ namespace ErrorCodes
extern const int DUPLICATED_PART_UUIDS;
}
RemoteQueryExecutor::RemoteQueryExecutor(
const String & query_, const Block & header_, ContextPtr context_,
const Scalars & scalars_, const Tables & external_tables_,
QueryProcessingStage::Enum stage_, std::shared_ptr<TaskIterator> task_iterator_)
: header(header_), query(query_), context(context_), scalars(scalars_)
, external_tables(external_tables_), stage(stage_), task_iterator(task_iterator_)
{}
RemoteQueryExecutor::RemoteQueryExecutor(
Connection & connection,
const String & query_, const Block & header_, ContextPtr context_,
ThrottlerPtr throttler, const Scalars & scalars_, const Tables & external_tables_,
QueryProcessingStage::Enum stage_, std::shared_ptr<TaskIterator> task_iterator_)
: header(header_), query(query_), context(context_)
, scalars(scalars_), external_tables(external_tables_), stage(stage_), task_iterator(task_iterator_), sync_draining(true)
: RemoteQueryExecutor(query_, header_, context_, scalars_, external_tables_, stage_, task_iterator_)
{
create_connections = [this, &connection, throttler]()
{
@ -48,6 +55,19 @@ RemoteQueryExecutor::RemoteQueryExecutor(
};
}
RemoteQueryExecutor::RemoteQueryExecutor(
std::shared_ptr<Connection> connection_ptr,
const String & query_, const Block & header_, ContextPtr context_,
ThrottlerPtr throttler, const Scalars & scalars_, const Tables & external_tables_,
QueryProcessingStage::Enum stage_, std::shared_ptr<TaskIterator> task_iterator_)
: RemoteQueryExecutor(query_, header_, context_, scalars_, external_tables_, stage_, task_iterator_)
{
create_connections = [this, connection_ptr, throttler]()
{
return std::make_shared<MultiplexedConnections>(connection_ptr, context->getSettingsRef(), throttler);
};
}
RemoteQueryExecutor::RemoteQueryExecutor(
const ConnectionPoolWithFailoverPtr & pool_,
std::vector<IConnectionPool::Entry> && connections_,

View File

@ -43,6 +43,13 @@ public:
ThrottlerPtr throttler_ = nullptr, const Scalars & scalars_ = Scalars(), const Tables & external_tables_ = Tables(),
QueryProcessingStage::Enum stage_ = QueryProcessingStage::Complete, std::shared_ptr<TaskIterator> task_iterator_ = {});
/// Takes already set connection.
RemoteQueryExecutor(
std::shared_ptr<Connection> connection,
const String & query_, const Block & header_, ContextPtr context_,
ThrottlerPtr throttler_ = nullptr, const Scalars & scalars_ = Scalars(), const Tables & external_tables_ = Tables(),
QueryProcessingStage::Enum stage_ = QueryProcessingStage::Complete, std::shared_ptr<TaskIterator> task_iterator_ = {});
/// Accepts several connections already taken from pool.
RemoteQueryExecutor(
const ConnectionPoolWithFailoverPtr & pool,
@ -105,6 +112,11 @@ public:
const Block & getHeader() const { return header; }
private:
RemoteQueryExecutor(
const String & query_, const Block & header_, ContextPtr context_,
const Scalars & scalars_, const Tables & external_tables_,
QueryProcessingStage::Enum stage_, std::shared_ptr<TaskIterator> task_iterator_);
Block header;
Block totals;
Block extremes;
@ -124,9 +136,6 @@ private:
/// Initiator identifier for distributed task processing
std::shared_ptr<TaskIterator> task_iterator;
/// Drain connection synchronously when finishing.
bool sync_draining = false;
std::function<std::shared_ptr<IConnections>()> create_connections;
/// Hold a shared reference to the connection pool so that asynchronous connection draining will
/// work safely. Make sure it's the first member so that we don't destruct it too early.

View File

@ -4,6 +4,9 @@
#include <DataStreams/NativeBlockInputStream.h>
#include <DataStreams/NativeBlockOutputStream.h>
#include <DataStreams/copyData.h>
#include <Processors/QueryPipeline.h>
#include <Processors/Executors/PullingPipelineExecutor.h>
#include <Processors/ISource.h>
#include <Compression/CompressedReadBuffer.h>
#include <Compression/CompressedWriteBuffer.h>
#include <IO/ReadBufferFromFile.h>
@ -32,32 +35,38 @@ struct TemporaryFileStream
{}
/// Flush data from input stream into file for future reading
static void write(const std::string & path, const Block & header, IBlockInputStream & input,
std::atomic<bool> * is_cancelled, const std::string & codec)
static void write(const std::string & path, const Block & header, QueryPipeline pipeline, const std::string & codec)
{
WriteBufferFromFile file_buf(path);
CompressedWriteBuffer compressed_buf(file_buf, CompressionCodecFactory::instance().get(codec, {}));
NativeBlockOutputStream output(compressed_buf, 0, header);
copyData(input, output, is_cancelled);
PullingPipelineExecutor executor(pipeline);
output.writePrefix();
Block block;
while (executor.pull(block))
output.write(block);
output.writeSuffix();
compressed_buf.finalize();
}
};
class TemporaryFileLazyInputStream : public IBlockInputStream
class TemporaryFileLazySource : public ISource
{
public:
TemporaryFileLazyInputStream(const std::string & path_, const Block & header_)
: path(path_)
, header(header_)
TemporaryFileLazySource(const std::string & path_, const Block & header_)
: ISource(header_)
, path(path_)
, done(false)
{}
String getName() const override { return "TemporaryFile"; }
Block getHeader() const override { return header; }
void readSuffix() override {}
String getName() const override { return "TemporaryFileLazySource"; }
protected:
Block readImpl() override
Chunk generate() override
{
if (done)
return {};
@ -71,7 +80,7 @@ protected:
done = true;
stream.reset();
}
return block;
return Chunk(block.getColumns(), block.rows());
}
private:

View File

@ -1,11 +1,10 @@
#include <gtest/gtest.h>
#include <Core/Block.h>
#include <Columns/ColumnVector.h>
#include <DataStreams/BlocksListBlockInputStream.h>
#include <Processors/Sources/BlocksListSource.h>
#include <DataTypes/DataTypesNumber.h>
#include <Columns/ColumnsNumber.h>
#include <Processors/Pipe.h>
#include <Processors/Sources/SourceFromInputStream.h>
#include <Processors/Merges/MergingSortedTransform.h>
#include <Processors/Executors/PipelineExecutingBlockInputStream.h>
#include <Processors/QueryPipeline.h>
@ -40,7 +39,7 @@ static Pipe getInputStreams(const std::vector<std::string> & column_names, const
size_t start = stride;
while (blocks_count--)
blocks.push_back(getBlockWithSize(column_names, block_size_in_bytes, stride, start));
pipes.emplace_back(std::make_shared<SourceFromInputStream>(std::make_shared<BlocksListBlockInputStream>(std::move(blocks))));
pipes.emplace_back(std::make_shared<BlocksListSource>(std::move(blocks)));
}
return Pipe::unitePipes(std::move(pipes));
@ -57,7 +56,7 @@ static Pipe getInputStreamsEqualStride(const std::vector<std::string> & column_n
size_t start = i;
while (blocks_count--)
blocks.push_back(getBlockWithSize(column_names, block_size_in_bytes, stride, start));
pipes.emplace_back(std::make_shared<SourceFromInputStream>(std::make_shared<BlocksListBlockInputStream>(std::move(blocks))));
pipes.emplace_back(std::make_shared<BlocksListSource>(std::move(blocks)));
i++;
}
return Pipe::unitePipes(std::move(pipes));

View File

@ -2,8 +2,10 @@
#include <gtest/gtest.h>
#include <Columns/ColumnsNumber.h>
#include <DataStreams/BlocksListBlockInputStream.h>
#include <DataStreams/CheckSortedBlockInputStream.h>
#include <Processors/Sources/BlocksListSource.h>
#include <Processors/Transforms/CheckSortedTransform.h>
#include <Processors/Executors/PullingPipelineExecutor.h>
#include <Processors/QueryPipeline.h>
#include <DataTypes/DataTypesNumber.h>
@ -89,14 +91,22 @@ TEST(CheckSortedBlockInputStream, CheckGoodCase)
for (size_t i = 0; i < 3; ++i)
blocks.push_back(getSortedBlockWithSize(key_columns, 10, 1, i * 10));
BlockInputStreamPtr stream = std::make_shared<BlocksListBlockInputStream>(std::move(blocks));
Pipe pipe(std::make_shared<BlocksListSource>(std::move(blocks)));
pipe.addSimpleTransform([&](const Block & header)
{
return std::make_shared<CheckSortedTransform>(header, sort_description);
});
CheckSortedBlockInputStream sorted(stream, sort_description);
QueryPipeline pipeline;
pipeline.init(std::move(pipe));
EXPECT_NO_THROW(sorted.read());
EXPECT_NO_THROW(sorted.read());
EXPECT_NO_THROW(sorted.read());
EXPECT_EQ(sorted.read(), Block());
PullingPipelineExecutor executor(pipeline);
Chunk chunk;
EXPECT_NO_THROW(executor.pull(chunk));
EXPECT_NO_THROW(executor.pull(chunk));
EXPECT_NO_THROW(executor.pull(chunk));
EXPECT_FALSE(executor.pull(chunk));
}
TEST(CheckSortedBlockInputStream, CheckBadLastRow)
@ -109,14 +119,21 @@ TEST(CheckSortedBlockInputStream, CheckBadLastRow)
blocks.push_back(getSortedBlockWithSize(key_columns, 100, 1, 0));
blocks.push_back(getSortedBlockWithSize(key_columns, 100, 1, 300));
BlockInputStreamPtr stream = std::make_shared<BlocksListBlockInputStream>(std::move(blocks));
Pipe pipe(std::make_shared<BlocksListSource>(std::move(blocks)));
pipe.addSimpleTransform([&](const Block & header)
{
return std::make_shared<CheckSortedTransform>(header, sort_description);
});
CheckSortedBlockInputStream sorted(stream, sort_description);
QueryPipeline pipeline;
pipeline.init(std::move(pipe));
PullingPipelineExecutor executor(pipeline);
EXPECT_NO_THROW(sorted.read());
EXPECT_NO_THROW(sorted.read());
EXPECT_THROW(sorted.read(), DB::Exception);
Chunk chunk;
EXPECT_NO_THROW(executor.pull(chunk));
EXPECT_NO_THROW(executor.pull(chunk));
EXPECT_THROW(executor.pull(chunk), DB::Exception);
}
@ -127,11 +144,19 @@ TEST(CheckSortedBlockInputStream, CheckUnsortedBlock1)
BlocksList blocks;
blocks.push_back(getUnSortedBlockWithSize(key_columns, 100, 1, 0, 5, 1, 77));
BlockInputStreamPtr stream = std::make_shared<BlocksListBlockInputStream>(std::move(blocks));
Pipe pipe(std::make_shared<BlocksListSource>(std::move(blocks)));
pipe.addSimpleTransform([&](const Block & header)
{
return std::make_shared<CheckSortedTransform>(header, sort_description);
});
CheckSortedBlockInputStream sorted(stream, sort_description);
QueryPipeline pipeline;
pipeline.init(std::move(pipe));
EXPECT_THROW(sorted.read(), DB::Exception);
PullingPipelineExecutor executor(pipeline);
Chunk chunk;
EXPECT_THROW(executor.pull(chunk), DB::Exception);
}
TEST(CheckSortedBlockInputStream, CheckUnsortedBlock2)
@ -141,11 +166,19 @@ TEST(CheckSortedBlockInputStream, CheckUnsortedBlock2)
BlocksList blocks;
blocks.push_back(getUnSortedBlockWithSize(key_columns, 100, 1, 0, 99, 2, 77));
BlockInputStreamPtr stream = std::make_shared<BlocksListBlockInputStream>(std::move(blocks));
Pipe pipe(std::make_shared<BlocksListSource>(std::move(blocks)));
pipe.addSimpleTransform([&](const Block & header)
{
return std::make_shared<CheckSortedTransform>(header, sort_description);
});
CheckSortedBlockInputStream sorted(stream, sort_description);
QueryPipeline pipeline;
pipeline.init(std::move(pipe));
EXPECT_THROW(sorted.read(), DB::Exception);
PullingPipelineExecutor executor(pipeline);
Chunk chunk;
EXPECT_THROW(executor.pull(chunk), DB::Exception);
}
TEST(CheckSortedBlockInputStream, CheckUnsortedBlock3)
@ -155,11 +188,19 @@ TEST(CheckSortedBlockInputStream, CheckUnsortedBlock3)
BlocksList blocks;
blocks.push_back(getUnSortedBlockWithSize(key_columns, 100, 1, 0, 50, 0, 77));
BlockInputStreamPtr stream = std::make_shared<BlocksListBlockInputStream>(std::move(blocks));
Pipe pipe(std::make_shared<BlocksListSource>(std::move(blocks)));
pipe.addSimpleTransform([&](const Block & header)
{
return std::make_shared<CheckSortedTransform>(header, sort_description);
});
CheckSortedBlockInputStream sorted(stream, sort_description);
QueryPipeline pipeline;
pipeline.init(std::move(pipe));
EXPECT_THROW(sorted.read(), DB::Exception);
PullingPipelineExecutor executor(pipeline);
Chunk chunk;
EXPECT_THROW(executor.pull(chunk), DB::Exception);
}
TEST(CheckSortedBlockInputStream, CheckEqualBlock)
@ -171,11 +212,19 @@ TEST(CheckSortedBlockInputStream, CheckEqualBlock)
blocks.push_back(getEqualValuesBlockWithSize(key_columns, 10));
blocks.push_back(getEqualValuesBlockWithSize(key_columns, 1));
BlockInputStreamPtr stream = std::make_shared<BlocksListBlockInputStream>(std::move(blocks));
Pipe pipe(std::make_shared<BlocksListSource>(std::move(blocks)));
pipe.addSimpleTransform([&](const Block & header)
{
return std::make_shared<CheckSortedTransform>(header, sort_description);
});
CheckSortedBlockInputStream sorted(stream, sort_description);
QueryPipeline pipeline;
pipeline.init(std::move(pipe));
EXPECT_NO_THROW(sorted.read());
EXPECT_NO_THROW(sorted.read());
EXPECT_NO_THROW(sorted.read());
PullingPipelineExecutor executor(pipeline);
Chunk chunk;
EXPECT_NO_THROW(executor.pull(chunk));
EXPECT_NO_THROW(executor.pull(chunk));
EXPECT_NO_THROW(executor.pull(chunk));
}

View File

@ -18,7 +18,6 @@ SRCS(
BlockIO.cpp
BlockStreamProfileInfo.cpp
CheckConstraintsBlockOutputStream.cpp
CheckSortedBlockInputStream.cpp
ColumnGathererStream.cpp
ConvertingBlockInputStream.cpp
CountingBlockOutputStream.cpp
@ -28,9 +27,7 @@ SRCS(
IBlockInputStream.cpp
ITTLAlgorithm.cpp
InternalTextLogsRowOutputStream.cpp
LimitBlockInputStream.cpp
MaterializingBlockInputStream.cpp
MergingSortedBlockInputStream.cpp
MongoDBBlockInputStream.cpp
NativeBlockInputStream.cpp
NativeBlockOutputStream.cpp

67
src/IO/OpenedFile.cpp Normal file
View File

@ -0,0 +1,67 @@
#include <unistd.h>
#include <fcntl.h>
#include <Common/ProfileEvents.h>
#include <Common/Exception.h>
#include <IO/OpenedFile.h>
namespace ProfileEvents
{
extern const Event FileOpen;
}
namespace DB
{
namespace ErrorCodes
{
extern const int FILE_DOESNT_EXIST;
extern const int CANNOT_OPEN_FILE;
extern const int CANNOT_CLOSE_FILE;
}
void OpenedFile::open(int flags)
{
ProfileEvents::increment(ProfileEvents::FileOpen);
fd = ::open(file_name.c_str(), (flags == -1 ? 0 : flags) | O_RDONLY | O_CLOEXEC);
if (-1 == fd)
throwFromErrnoWithPath("Cannot open file " + file_name, file_name,
errno == ENOENT ? ErrorCodes::FILE_DOESNT_EXIST : ErrorCodes::CANNOT_OPEN_FILE);
}
std::string OpenedFile::getFileName() const
{
return file_name;
}
OpenedFile::OpenedFile(const std::string & file_name_, int flags)
: file_name(file_name_)
{
open(flags);
}
OpenedFile::~OpenedFile()
{
if (fd != -1)
close(); /// Exceptions will lead to std::terminate and that's Ok.
}
void OpenedFile::close()
{
if (0 != ::close(fd))
throw Exception("Cannot close file", ErrorCodes::CANNOT_CLOSE_FILE);
fd = -1;
metric_increment.destroy();
}
}

39
src/IO/OpenedFile.h Normal file
View File

@ -0,0 +1,39 @@
#pragma once
#include <Common/CurrentMetrics.h>
#include <memory>
namespace CurrentMetrics
{
extern const Metric OpenFileForRead;
}
namespace DB
{
/// RAII for readonly opened file descriptor.
class OpenedFile
{
public:
OpenedFile(const std::string & file_name_, int flags);
~OpenedFile();
/// Close prematurally.
void close();
int getFD() const { return fd; }
std::string getFileName() const;
private:
std::string file_name;
int fd = -1;
CurrentMetrics::Increment metric_increment{CurrentMetrics::OpenFileForRead};
void open(int flags);
};
}

74
src/IO/OpenedFileCache.h Normal file
View File

@ -0,0 +1,74 @@
#pragma once
#include <map>
#include <mutex>
#include <Core/Types.h>
#include <Common/ProfileEvents.h>
#include <IO/OpenedFile.h>
namespace ProfileEvents
{
extern const Event OpenedFileCacheHits;
extern const Event OpenedFileCacheMisses;
}
namespace DB
{
/** Cache of opened files for reading.
* It allows to share file descriptors when doing reading with 'pread' syscalls on readonly files.
* Note: open/close of files is very cheap on Linux and we should not bother doing it 10 000 times a second.
* (This may not be the case on Windows with WSL. This is also not the case if strace is active. Neither when some eBPF is loaded).
* But sometimes we may end up opening one file multiple times, that increases chance exhausting opened files limit.
*/
class OpenedFileCache
{
private:
using Key = std::pair<std::string /* path */, int /* flags */>;
using OpenedFileWeakPtr = std::weak_ptr<OpenedFile>;
using Files = std::map<Key, OpenedFileWeakPtr>;
Files files;
std::mutex mutex;
public:
using OpenedFilePtr = std::shared_ptr<OpenedFile>;
OpenedFilePtr get(const std::string & path, int flags)
{
Key key(path, flags);
std::lock_guard lock(mutex);
auto [it, inserted] = files.emplace(key, OpenedFilePtr{});
if (!inserted)
if (auto res = it->second.lock())
return res;
OpenedFilePtr res
{
new OpenedFile(path, flags),
[key, this](auto ptr)
{
{
std::lock_guard another_lock(mutex);
files.erase(key);
}
delete ptr;
}
};
it->second = res;
return res;
}
};
using OpenedFileCachePtr = std::shared_ptr<OpenedFileCache>;
}

View File

@ -88,4 +88,7 @@ void ReadBufferFromFile::close()
metric_increment.destroy();
}
OpenedFileCache ReadBufferFromFilePReadWithCache::cache;
}

View File

@ -1,12 +1,14 @@
#pragma once
#include <IO/ReadBufferFromFileDescriptor.h>
#include <IO/OpenedFileCache.h>
#include <Common/CurrentMetrics.h>
#ifndef O_DIRECT
#define O_DIRECT 00040000
#endif
namespace CurrentMetrics
{
extern const Metric OpenFileForRead;
@ -60,4 +62,31 @@ public:
}
};
/** Similar to ReadBufferFromFilePRead but also transparently shares open file descriptors.
*/
class ReadBufferFromFilePReadWithCache : public ReadBufferFromFileDescriptorPRead
{
private:
static OpenedFileCache cache;
std::string file_name;
OpenedFileCache::OpenedFilePtr file;
public:
ReadBufferFromFilePReadWithCache(const std::string & file_name_, size_t buf_size = DBMS_DEFAULT_BUFFER_SIZE, int flags = -1,
char * existing_memory = nullptr, size_t alignment = 0)
: ReadBufferFromFileDescriptorPRead(-1, buf_size, existing_memory, alignment),
file_name(file_name_)
{
file = cache.get(file_name, flags);
fd = file->getFD();
}
std::string getFileName() const override
{
return file_name;
}
};
}

View File

@ -75,7 +75,7 @@ std::unique_ptr<ReadBufferFromFileBase> createReadBufferFromFileBase(
/// Attempt to open a file with O_DIRECT
try
{
auto res = std::make_unique<ReadBufferFromFile>(
auto res = std::make_unique<ReadBufferFromFilePReadWithCache>(
filename, buffer_size, (flags == -1 ? O_RDONLY | O_CLOEXEC : flags) | O_DIRECT, existing_memory, alignment);
ProfileEvents::increment(ProfileEvents::CreatedReadBufferDirectIO);
return res;
@ -92,7 +92,7 @@ std::unique_ptr<ReadBufferFromFileBase> createReadBufferFromFileBase(
#endif
ProfileEvents::increment(ProfileEvents::CreatedReadBufferOrdinary);
return std::make_unique<ReadBufferFromFile>(filename, buffer_size, flags, existing_memory, alignment);
return std::make_unique<ReadBufferFromFilePReadWithCache>(filename, buffer_size, flags, existing_memory, alignment);
}
}

View File

@ -3,7 +3,6 @@
#include <Columns/ColumnNullable.h>
#include <Core/NamesAndTypes.h>
#include <Core/SortCursor.h>
#include <DataStreams/BlocksListBlockInputStream.h>
#include <DataStreams/TemporaryFileStream.h>
#include <DataStreams/materializeBlock.h>
#include <DataTypes/DataTypeNullable.h>
@ -12,10 +11,10 @@
#include <Interpreters/TableJoin.h>
#include <Interpreters/join_common.h>
#include <Interpreters/sortBlock.h>
#include <Processors/Executors/PipelineExecutingBlockInputStream.h>
#include <Processors/Sources/BlocksListSource.h>
#include <Processors/QueryPipeline.h>
#include <Processors/Sources/SourceFromInputStream.h>
#include <Processors/Transforms/MergeSortingTransform.h>
#include <Processors/Executors/PipelineExecutingBlockInputStream.h>
namespace DB
@ -577,8 +576,7 @@ void MergeJoin::mergeInMemoryRightBlocks()
if (right_blocks.empty())
return;
auto stream = std::make_shared<BlocksListBlockInputStream>(std::move(right_blocks.blocks));
Pipe source(std::make_shared<SourceFromInputStream>(std::move(stream)));
Pipe source(std::make_shared<BlocksListSource>(std::move(right_blocks.blocks)));
right_blocks.clear();
QueryPipeline pipeline;

View File

@ -17,7 +17,7 @@
#include <Processors/QueryPlan/FilterStep.h>
#include <Processors/QueryPlan/ReadFromPreparedSource.h>
#include <Processors/Executors/PipelineExecutingBlockInputStream.h>
#include <DataStreams/CheckSortedBlockInputStream.h>
#include <Processors/Transforms/CheckSortedTransform.h>
#include <Parsers/ASTIdentifier.h>
#include <Parsers/ASTFunction.h>
#include <Parsers/ASTLiteral.h>
@ -901,12 +901,18 @@ BlockInputStreamPtr MutationsInterpreter::execute()
select_interpreter->buildQueryPlan(plan);
auto pipeline = addStreamsForLaterStages(stages, plan);
BlockInputStreamPtr result_stream = std::make_shared<PipelineExecutingBlockInputStream>(std::move(*pipeline));
/// Sometimes we update just part of columns (for example UPDATE mutation)
/// in this case we don't read sorting key, so just we don't check anything.
if (auto sort_desc = getStorageSortDescriptionIfPossible(result_stream->getHeader()))
result_stream = std::make_shared<CheckSortedBlockInputStream>(result_stream, *sort_desc);
if (auto sort_desc = getStorageSortDescriptionIfPossible(pipeline->getHeader()))
{
pipeline->addSimpleTransform([&](const Block & header)
{
return std::make_shared<CheckSortedTransform>(header, *sort_desc);
});
}
BlockInputStreamPtr result_stream = std::make_shared<PipelineExecutingBlockInputStream>(std::move(*pipeline));
if (!updated_header)
updated_header = std::make_unique<Block>(result_stream->getHeader());

View File

@ -1,7 +1,9 @@
#include <Core/SortCursor.h>
#include <Interpreters/SortedBlocksWriter.h>
#include <DataStreams/MergingSortedBlockInputStream.h>
#include <DataStreams/OneBlockInputStream.h>
#include <Processors/QueryPipeline.h>
#include <Processors/Executors/PullingPipelineExecutor.h>
#include <Processors/Sources/SourceFromSingleChunk.h>
#include <Processors/Merges/MergingSortedTransform.h>
#include <DataStreams/TemporaryFileStream.h>
#include <DataStreams/materializeBlock.h>
#include <Disks/IVolume.h>
@ -10,40 +12,36 @@
namespace DB
{
namespace ErrorCodes
{
extern const int NOT_ENOUGH_SPACE;
}
namespace
{
std::unique_ptr<TemporaryFile> flushToFile(const String & tmp_path, const Block & header, IBlockInputStream & stream, const String & codec)
std::unique_ptr<TemporaryFile> flushToFile(const String & tmp_path, const Block & header, QueryPipeline pipeline, const String & codec)
{
auto tmp_file = createTemporaryFile(tmp_path);
std::atomic<bool> is_cancelled{false};
TemporaryFileStream::write(tmp_file->path(), header, stream, &is_cancelled, codec);
if (is_cancelled)
throw Exception("Cannot flush MergeJoin data on disk. No space at " + tmp_path, ErrorCodes::NOT_ENOUGH_SPACE);
TemporaryFileStream::write(tmp_file->path(), header, std::move(pipeline), codec);
return tmp_file;
}
SortedBlocksWriter::SortedFiles flushToManyFiles(const String & tmp_path, const Block & header, IBlockInputStream & stream,
SortedBlocksWriter::SortedFiles flushToManyFiles(const String & tmp_path, const Block & header, QueryPipeline pipeline,
const String & codec, std::function<void(const Block &)> callback = [](const Block &){})
{
std::vector<std::unique_ptr<TemporaryFile>> files;
PullingPipelineExecutor executor(pipeline);
while (Block block = stream.read())
Block block;
while (executor.pull(block))
{
if (!block.rows())
continue;
callback(block);
OneBlockInputStream block_stream(block);
auto tmp_file = flushToFile(tmp_path, header, block_stream, codec);
QueryPipeline one_block_pipeline;
Chunk chunk(block.getColumns(), block.rows());
one_block_pipeline.init(Pipe(std::make_shared<SourceFromSingleChunk>(block.cloneEmpty(), std::move(chunk))));
auto tmp_file = flushToFile(tmp_path, header, std::move(one_block_pipeline), codec);
files.emplace_back(std::move(tmp_file));
}
@ -119,23 +117,30 @@ SortedBlocksWriter::TmpFilePtr SortedBlocksWriter::flush(const BlocksList & bloc
{
const std::string path = getPath();
if (blocks.empty())
Pipes pipes;
pipes.reserve(blocks.size());
for (const auto & block : blocks)
if (auto num_rows = block.rows())
pipes.emplace_back(std::make_shared<SourceFromSingleChunk>(block.cloneEmpty(), Chunk(block.getColumns(), num_rows)));
if (pipes.empty())
return {};
if (blocks.size() == 1)
QueryPipeline pipeline;
pipeline.init(Pipe::unitePipes(std::move(pipes)));
if (pipeline.getNumStreams() > 1)
{
OneBlockInputStream sorted_input(blocks.front());
return flushToFile(path, sample_block, sorted_input, codec);
auto transform = std::make_shared<MergingSortedTransform>(
pipeline.getHeader(),
pipeline.getNumStreams(),
sort_description,
rows_in_block);
pipeline.addTransform(std::move(transform));
}
BlockInputStreams inputs;
inputs.reserve(blocks.size());
for (const auto & block : blocks)
if (block.rows())
inputs.push_back(std::make_shared<OneBlockInputStream>(block));
MergingSortedBlockInputStream sorted_input(inputs, sort_description, rows_in_block);
return flushToFile(path, sample_block, sorted_input, codec);
return flushToFile(path, sample_block, std::move(pipeline), codec);
}
SortedBlocksWriter::PremergedFiles SortedBlocksWriter::premerge()
@ -158,8 +163,8 @@ SortedBlocksWriter::PremergedFiles SortedBlocksWriter::premerge()
if (!blocks.empty())
files.emplace_back(flush(blocks));
BlockInputStreams inputs;
inputs.reserve(num_files_for_merge);
Pipes pipes;
pipes.reserve(num_files_for_merge);
/// Merge by parts to save memory. It's possible to exchange disk I/O and memory by num_files_for_merge.
{
@ -170,13 +175,26 @@ SortedBlocksWriter::PremergedFiles SortedBlocksWriter::premerge()
{
for (const auto & file : files)
{
inputs.emplace_back(streamFromFile(file));
pipes.emplace_back(streamFromFile(file));
if (inputs.size() == num_files_for_merge || &file == &files.back())
if (pipes.size() == num_files_for_merge || &file == &files.back())
{
MergingSortedBlockInputStream sorted_input(inputs, sort_description, rows_in_block);
new_files.emplace_back(flushToFile(getPath(), sample_block, sorted_input, codec));
inputs.clear();
QueryPipeline pipeline;
pipeline.init(Pipe::unitePipes(std::move(pipes)));
pipes = Pipes();
if (pipeline.getNumStreams() > 1)
{
auto transform = std::make_shared<MergingSortedTransform>(
pipeline.getHeader(),
pipeline.getNumStreams(),
sort_description,
rows_in_block);
pipeline.addTransform(std::move(transform));
}
new_files.emplace_back(flushToFile(getPath(), sample_block, std::move(pipeline), codec));
}
}
@ -185,22 +203,35 @@ SortedBlocksWriter::PremergedFiles SortedBlocksWriter::premerge()
}
for (const auto & file : files)
inputs.emplace_back(streamFromFile(file));
pipes.emplace_back(streamFromFile(file));
}
return PremergedFiles{std::move(files), std::move(inputs)};
return PremergedFiles{std::move(files), Pipe::unitePipes(std::move(pipes))};
}
SortedBlocksWriter::SortedFiles SortedBlocksWriter::finishMerge(std::function<void(const Block &)> callback)
{
PremergedFiles files = premerge();
MergingSortedBlockInputStream sorted_input(files.streams, sort_description, rows_in_block);
return flushToManyFiles(getPath(), sample_block, sorted_input, codec, callback);
QueryPipeline pipeline;
pipeline.init(std::move(files.pipe));
if (pipeline.getNumStreams() > 1)
{
auto transform = std::make_shared<MergingSortedTransform>(
pipeline.getHeader(),
pipeline.getNumStreams(),
sort_description,
rows_in_block);
pipeline.addTransform(std::move(transform));
}
return flushToManyFiles(getPath(), sample_block, std::move(pipeline), codec, callback);
}
BlockInputStreamPtr SortedBlocksWriter::streamFromFile(const TmpFilePtr & file) const
Pipe SortedBlocksWriter::streamFromFile(const TmpFilePtr & file) const
{
return std::make_shared<TemporaryFileLazyInputStream>(file->path(), materializeBlock(sample_block));
return Pipe(std::make_shared<TemporaryFileLazySource>(file->path(), materializeBlock(sample_block)));
}
String SortedBlocksWriter::getPath() const
@ -250,18 +281,35 @@ Block SortedBlocksBuffer::mergeBlocks(Blocks && blocks) const
size_t num_rows = 0;
{ /// Merge sort blocks
BlockInputStreams inputs;
inputs.reserve(blocks.size());
Pipes pipes;
pipes.reserve(blocks.size());
for (auto & block : blocks)
{
num_rows += block.rows();
inputs.emplace_back(std::make_shared<OneBlockInputStream>(block));
Chunk chunk(block.getColumns(), block.rows());
pipes.emplace_back(std::make_shared<SourceFromSingleChunk>(block.cloneEmpty(), std::move(chunk)));
}
Blocks tmp_blocks;
MergingSortedBlockInputStream stream(inputs, sort_description, num_rows);
while (const auto & block = stream.read())
QueryPipeline pipeline;
pipeline.init(Pipe::unitePipes(std::move(pipes)));
if (pipeline.getNumStreams() > 1)
{
auto transform = std::make_shared<MergingSortedTransform>(
pipeline.getHeader(),
pipeline.getNumStreams(),
sort_description,
num_rows);
pipeline.addTransform(std::move(transform));
}
PullingPipelineExecutor executor(pipeline);
Block block;
while (executor.pull(block))
tmp_blocks.emplace_back(block);
blocks.swap(tmp_blocks);

View File

@ -6,6 +6,7 @@
#include <Common/filesystemHelpers.h>
#include <Core/Block.h>
#include <Core/SortDescription.h>
#include <Processors/Pipe.h>
#include <DataStreams/SizeLimits.h>
#include <DataStreams/IBlockStream_fwd.h>
@ -17,6 +18,8 @@ class TableJoin;
class MergeJoinCursor;
struct MergeJoinEqualRange;
class Pipe;
class IVolume;
using VolumePtr = std::shared_ptr<IVolume>;
@ -56,7 +59,7 @@ struct SortedBlocksWriter
struct PremergedFiles
{
SortedFiles files;
BlockInputStreams streams;
Pipe pipe;
};
static constexpr const size_t num_streams = 2;
@ -94,7 +97,7 @@ struct SortedBlocksWriter
}
String getPath() const;
BlockInputStreamPtr streamFromFile(const TmpFilePtr & file) const;
Pipe streamFromFile(const TmpFilePtr & file) const;
void insert(Block && block);
TmpFilePtr flush(const BlocksList & blocks) const;

View File

@ -25,8 +25,6 @@ public:
bool id_mode = false; /// whether this set keep UUIDs instead of names
bool use_keyword_any = false; /// whether the keyword ANY should be used instead of the keyword ALL
bool none_role_parsed = false; /// whether keyword NONE has been parsed
bool empty() const { return names.empty() && !current_user && !all; }
void replaceCurrentUserTag(const String & current_user_name);

View File

@ -293,42 +293,15 @@ bool ParserGrantQuery::parseImpl(Pos & pos, ASTPtr & node, Expected & expected)
bool replace_access = false;
bool replace_role = false;
if (is_replace)
{
if (roles)
{ // assigning role mode
if (!roles->empty() && roles->none_role_parsed)
throw Exception("In assigning role WITH REPLACE OPTION sql, 'NONE' can only be used alone to rovoke all roles", ErrorCodes::SYNTAX_ERROR);
}
replace_role = true;
else
{
// granting privilege mode
replace_access = true;
bool new_access = false;
bool none_on_all = false;
for (auto & element : elements)
{
if (element.access_flags.isEmpty())
{
if (element.any_database)
none_on_all = true;
else
throw Exception("In granting privilege WITH REPLACE OPTION sql, 'NONE ON db.*' should be 'NONE ON *.*', and can only be used alone to drop all privileges on any database", ErrorCodes::SYNTAX_ERROR);
}
else
{
new_access = true;
}
}
if (new_access && none_on_all)
throw Exception("In granting privilege WITH REPLACE OPTION sql, 'NONE ON db.*' should be 'NONE ON *.*', and can only be used alone to drop all privileges on any database", ErrorCodes::SYNTAX_ERROR);
}
}
if (is_replace && !replace_access && roles && !roles->empty() && roles->none_role_parsed)
throw Exception("In REPLACE GRANT assigning role sql, 'NONE' can only be used alone to rovoke all roles", ErrorCodes::SYNTAX_ERROR);
if (!is_revoke)
eraseNonGrantable(elements);
@ -343,7 +316,7 @@ bool ParserGrantQuery::parseImpl(Pos & pos, ASTPtr & node, Expected & expected)
query->grantees = std::move(grantees);
query->admin_option = admin_option;
query->replace_access = replace_access;
query->replace_granted_roles = (is_replace && !replace_access);
query->replace_granted_roles = replace_role;
return true;
}

View File

@ -44,22 +44,17 @@ namespace
bool allow_current_user,
bool & all,
Strings & names,
bool & current_user,
bool & none_role_parsed)
bool & current_user)
{
bool res_all = false;
Strings res_names;
bool res_current_user = false;
Strings res_with_roles_names;
bool parsed_none = false;
auto parse_element = [&]
{
if (ParserKeyword{"NONE"}.ignore(pos, expected))
{
parsed_none = true;
return true;
}
if (allow_all && ParserKeyword{"ALL"}.ignore(pos, expected))
{
@ -95,7 +90,6 @@ namespace
names = std::move(res_names);
current_user = res_current_user;
all = res_all;
none_role_parsed = parsed_none;
return true;
}
@ -105,15 +99,14 @@ namespace
bool id_mode,
bool allow_current_user,
Strings & except_names,
bool & except_current_user,
bool & parsed_none)
bool & except_current_user)
{
return IParserBase::wrapParseImpl(pos, [&] {
if (!ParserKeyword{"EXCEPT"}.ignore(pos, expected))
return false;
bool unused;
return parseBeforeExcept(pos, expected, id_mode, false, false, allow_current_user, unused, except_names, except_current_user, parsed_none);
return parseBeforeExcept(pos, expected, id_mode, false, false, allow_current_user, unused, except_names, except_current_user);
});
}
}
@ -126,12 +119,11 @@ bool ParserRolesOrUsersSet::parseImpl(Pos & pos, ASTPtr & node, Expected & expec
bool current_user = false;
Strings except_names;
bool except_current_user = false;
bool parsed_none = false;
if (!parseBeforeExcept(pos, expected, id_mode, allow_all, allow_any, allow_current_user, all, names, current_user, parsed_none))
if (!parseBeforeExcept(pos, expected, id_mode, allow_all, allow_any, allow_current_user, all, names, current_user))
return false;
parseExceptAndAfterExcept(pos, expected, id_mode, allow_current_user, except_names, except_current_user, parsed_none);
parseExceptAndAfterExcept(pos, expected, id_mode, allow_current_user, except_names, except_current_user);
if (all)
names.clear();
@ -146,7 +138,6 @@ bool ParserRolesOrUsersSet::parseImpl(Pos & pos, ASTPtr & node, Expected & expec
result->allow_roles = allow_roles;
result->id_mode = id_mode;
result->use_keyword_any = all && allow_any && !allow_all;
result->none_role_parsed = parsed_none;
node = result;
return true;
}

View File

@ -0,0 +1,47 @@
#pragma once
#include <Processors/Sources/SourceWithProgress.h>
namespace DB
{
/** A stream of blocks from which you can read the next block from an explicitly provided list.
* Also see OneBlockInputStream.
*/
class BlocksListSource : public SourceWithProgress
{
public:
/// Acquires the ownership of the block list.
explicit BlocksListSource(BlocksList && list_)
: SourceWithProgress(list_.empty() ? Block() : list_.front().cloneEmpty())
, list(std::move(list_)), it(list.begin()), end(list.end()) {}
/// Uses a list of blocks lying somewhere else.
BlocksListSource(BlocksList::iterator & begin_, BlocksList::iterator & end_)
: SourceWithProgress(begin_ == end_ ? Block() : begin_->cloneEmpty())
, it(begin_), end(end_) {}
String getName() const override { return "BlocksListSource"; }
protected:
Chunk generate() override
{
if (it == end)
return {};
Block res = *it;
++it;
size_t num_rows = res.rows();
return Chunk(res.getColumns(), num_rows);
}
private:
BlocksList list;
BlocksList::iterator it;
const BlocksList::iterator end;
};
}

View File

@ -1,4 +1,4 @@
#include <DataStreams/CheckSortedBlockInputStream.h>
#include <Processors/Transforms/CheckSortedTransform.h>
#include <Common/FieldVisitorDump.h>
#include <Common/quoteString.h>
#include <Core/SortDescription.h>
@ -12,20 +12,20 @@ namespace ErrorCodes
extern const int LOGICAL_ERROR;
}
CheckSortedBlockInputStream::CheckSortedBlockInputStream(
const BlockInputStreamPtr & input_,
CheckSortedTransform::CheckSortedTransform(
const Block & header_,
const SortDescription & sort_description_)
: header(input_->getHeader())
: ISimpleTransform(header_, header_, false)
, sort_description_map(addPositionsToSortDescriptions(sort_description_))
{
children.push_back(input_);
}
SortDescriptionsWithPositions
CheckSortedBlockInputStream::addPositionsToSortDescriptions(const SortDescription & sort_description)
CheckSortedTransform::addPositionsToSortDescriptions(const SortDescription & sort_description)
{
SortDescriptionsWithPositions result;
result.reserve(sort_description.size());
const auto & header = getInputPort().getHeader();
for (SortColumnDescription description_copy : sort_description)
{
@ -39,11 +39,11 @@ CheckSortedBlockInputStream::addPositionsToSortDescriptions(const SortDescriptio
}
Block CheckSortedBlockInputStream::readImpl()
void CheckSortedTransform::transform(Chunk & chunk)
{
Block block = children.back()->read();
if (!block || block.rows() == 0)
return block;
size_t num_rows = chunk.getNumRows();
if (num_rows == 0)
return;
auto check = [this](const Columns & left, size_t left_index, const Columns & right, size_t right_index)
{
@ -70,23 +70,20 @@ Block CheckSortedBlockInputStream::readImpl()
}
};
auto block_columns = block.getColumns();
const auto & chunk_columns = chunk.getColumns();
if (!last_row.empty())
check(last_row, 0, block_columns, 0);
check(last_row, 0, chunk_columns, 0);
size_t rows = block.rows();
for (size_t i = 1; i < rows; ++i)
check(block_columns, i - 1, block_columns, i);
for (size_t i = 1; i < num_rows; ++i)
check(chunk_columns, i - 1, chunk_columns, i);
last_row.clear();
for (size_t i = 0; i < block.columns(); ++i)
for (const auto & chunk_column : chunk_columns)
{
auto column = block_columns[i]->cloneEmpty();
column->insertFrom(*block_columns[i], rows - 1);
auto column = chunk_column->cloneEmpty();
column->insertFrom(*chunk_column, num_rows - 1);
last_row.emplace_back(std::move(column));
}
return block;
}
}

View File

@ -1,5 +1,5 @@
#pragma once
#include <DataStreams/IBlockInputStream.h>
#include <Processors/ISimpleTransform.h>
#include <Core/SortDescription.h>
#include <Columns/IColumn.h>
@ -9,26 +9,23 @@ using SortDescriptionsWithPositions = std::vector<SortColumnDescription>;
/// Streams checks that flow of blocks is sorted in the sort_description order
/// Othrewise throws exception in readImpl function.
class CheckSortedBlockInputStream : public IBlockInputStream
class CheckSortedTransform : public ISimpleTransform
{
public:
CheckSortedBlockInputStream(
const BlockInputStreamPtr & input_,
CheckSortedTransform(
const Block & header_,
const SortDescription & sort_description_);
String getName() const override { return "CheckingSorted"; }
String getName() const override { return "CheckSortedTransform"; }
Block getHeader() const override { return header; }
protected:
Block readImpl() override;
void transform(Chunk & chunk) override;
private:
Block header;
SortDescriptionsWithPositions sort_description_map;
Columns last_row;
private:
/// Just checks, that all sort_descriptions has column_number
SortDescriptionsWithPositions addPositionsToSortDescriptions(const SortDescription & sort_description);
};

View File

@ -144,6 +144,7 @@ SRCS(
Transforms/AggregatingInOrderTransform.cpp
Transforms/AggregatingTransform.cpp
Transforms/ArrayJoinTransform.cpp
Transforms/CheckSortedTransform.cpp
Transforms/CopyTransform.cpp
Transforms/CreatingSetsTransform.cpp
Transforms/CubeTransform.cpp

View File

@ -16,7 +16,7 @@ limitations under the License. */
#include <Interpreters/Context.h>
#include <Interpreters/InterpreterSelectQuery.h>
#include <DataStreams/IBlockOutputStream.h>
#include <DataStreams/BlocksSource.h>
#include <Processors/Sources/BlocksSource.h>
#include <DataStreams/MaterializingBlockInputStream.h>
#include <DataStreams/SquashingBlockInputStream.h>
#include <DataStreams/copyData.h>

View File

@ -158,6 +158,16 @@ static void checkSampleExpression(const StorageInMemoryMetadata & metadata, bool
ErrorCodes::ILLEGAL_TYPE_OF_COLUMN_FOR_FILTER);
}
inline UInt64 time_in_microseconds(std::chrono::time_point<std::chrono::system_clock> timepoint)
{
return std::chrono::duration_cast<std::chrono::microseconds>(timepoint.time_since_epoch()).count();
}
inline UInt64 time_in_seconds(std::chrono::time_point<std::chrono::system_clock> timepoint)
{
return std::chrono::duration_cast<std::chrono::seconds>(timepoint.time_since_epoch()).count();
}
MergeTreeData::MergeTreeData(
const StorageID & table_id_,
const String & relative_data_path_,
@ -1246,7 +1256,11 @@ void MergeTreeData::removePartsFinally(const MergeTreeData::DataPartsVector & pa
PartLogElement part_log_elem;
part_log_elem.event_type = PartLogElement::REMOVE_PART;
part_log_elem.event_time = time(nullptr);
const auto time_now = std::chrono::system_clock::now();
part_log_elem.event_time = time_in_seconds(time_now);
part_log_elem.event_time_microseconds = time_in_microseconds(time_now);
part_log_elem.duration_ms = 0; //-V1048
part_log_elem.database_name = table_id.database_name;
@ -4579,17 +4593,6 @@ bool MergeTreeData::canReplacePartition(const DataPartPtr & src_part) const
return true;
}
inline UInt64 time_in_microseconds(std::chrono::time_point<std::chrono::system_clock> timepoint)
{
return std::chrono::duration_cast<std::chrono::microseconds>(timepoint.time_since_epoch()).count();
}
inline UInt64 time_in_seconds(std::chrono::time_point<std::chrono::system_clock> timepoint)
{
return std::chrono::duration_cast<std::chrono::seconds>(timepoint.time_since_epoch()).count();
}
void MergeTreeData::writePartLog(
PartLogElement::Type type,
const ExecutionStatus & execution_status,

View File

@ -106,7 +106,6 @@ Pipe StorageS3Cluster::read(
const Scalars & scalars = context->hasQueryContext() ? context->getQueryContext()->getScalars() : Scalars{};
Pipes pipes;
connections.reserve(cluster->getShardCount());
const bool add_agg_info = processed_stage == QueryProcessingStage::WithMergeableState;
@ -115,18 +114,19 @@ Pipe StorageS3Cluster::read(
/// There will be only one replica, because we consider each replica as a shard
for (const auto & node : replicas)
{
connections.emplace_back(std::make_shared<Connection>(
auto connection = std::make_shared<Connection>(
node.host_name, node.port, context->getGlobalContext()->getCurrentDatabase(),
node.user, node.password, node.cluster, node.cluster_secret,
"S3ClusterInititiator",
node.compression,
node.secure
));
);
/// For unknown reason global context is passed to IStorage::read() method
/// So, task_identifier is passed as constructor argument. It is more obvious.
auto remote_query_executor = std::make_shared<RemoteQueryExecutor>(
*connections.back(),
connection,
queryToString(query_info.query),
header,
context,

View File

@ -50,8 +50,6 @@ protected:
const String & compression_method_);
private:
/// Connections from initiator to other nodes
std::vector<std::shared_ptr<Connection>> connections;
StorageS3::ClientAuthentication client_auth;
String filename;

View File

@ -6,6 +6,13 @@ cluster = ClickHouseCluster(__file__)
instance = cluster.add_instance('instance')
session_id_counter = 0
def new_session_id():
global session_id_counter
session_id_counter += 1
return 'session #' + str(session_id_counter)
@pytest.fixture(scope="module", autouse=True)
def started_cluster():
try:
@ -138,6 +145,27 @@ def test_revoke_requires_admin_option():
assert instance.query("SHOW GRANTS FOR B") == ""
def test_set_role():
instance.query("CREATE USER A")
instance.query("CREATE ROLE R1, R2")
instance.query("GRANT R1, R2 TO A")
session_id = new_session_id()
assert instance.http_query('SHOW CURRENT ROLES', user='A', params={'session_id':session_id}) == TSV([["R1", 0, 1], ["R2", 0, 1]])
instance.http_query('SET ROLE R1', user='A', params={'session_id':session_id})
assert instance.http_query('SHOW CURRENT ROLES', user='A', params={'session_id':session_id}) == TSV([["R1", 0, 1]])
instance.http_query('SET ROLE R2', user='A', params={'session_id':session_id})
assert instance.http_query('SHOW CURRENT ROLES', user='A', params={'session_id':session_id}) == TSV([["R2", 0, 1]])
instance.http_query('SET ROLE NONE', user='A', params={'session_id':session_id})
assert instance.http_query('SHOW CURRENT ROLES', user='A', params={'session_id':session_id}) == TSV([])
instance.http_query('SET ROLE DEFAULT', user='A', params={'session_id':session_id})
assert instance.http_query('SHOW CURRENT ROLES', user='A', params={'session_id':session_id}) == TSV([["R1", 0, 1], ["R2", 0, 1]])
def test_introspection():
instance.query("CREATE USER A")
instance.query("CREATE USER B")

View File

@ -214,12 +214,15 @@ def test_show_profiles():
def test_set_profile():
instance.query("CREATE SETTINGS PROFILE P1 SETTINGS max_memory_usage=10000000001")
instance.query("CREATE SETTINGS PROFILE P1 SETTINGS max_memory_usage=10000000001 MAX 20000000002")
session_id = new_session_id()
instance.http_query("SET profile='P1'", user='robin', params={'session_id':session_id})
assert instance.http_query("SELECT getSetting('max_memory_usage')", user='robin', params={'session_id':session_id}) == "10000000001\n"
expected_error = "max_memory_usage shouldn't be greater than 20000000002"
assert expected_error in instance.http_query_and_get_error("SET max_memory_usage=20000000003", user='robin', params={'session_id':session_id})
def test_changing_default_profiles_affects_new_sessions_only():
instance.query("CREATE SETTINGS PROFILE P1 SETTINGS max_memory_usage=10000000001")

View File

@ -10,16 +10,27 @@ ORDER BY key;
INSERT INTO table_with_single_pk SELECT number, toString(number % 10) FROM numbers(1000000);
-- Check NewPart
SYSTEM FLUSH LOGS;
WITH (
SELECT (event_time, event_time_microseconds)
FROM system.part_log
WHERE "table" = 'table_with_single_pk'
AND "database" = currentDatabase()
WHERE table = 'table_with_single_pk' AND database = currentDatabase() AND event_type = 'NewPart'
ORDER BY event_time DESC
LIMIT 1
) AS time
SELECT if(dateDiff('second', toDateTime(time.2), toDateTime(time.1)) = 0, 'ok', 'fail');
DROP TABLE IF EXISTS table_with_single_pk;
-- Now let's check RemovePart
TRUNCATE TABLE table_with_single_pk;
SYSTEM FLUSH LOGS;
WITH (
SELECT (event_time, event_time_microseconds)
FROM system.part_log
WHERE table = 'table_with_single_pk' AND database = currentDatabase() AND event_type = 'RemovePart'
ORDER BY event_time DESC
LIMIT 1
) AS time
SELECT if(dateDiff('second', toDateTime(time.2), toDateTime(time.1)) = 0, 'ok', 'fail');
DROP TABLE table_with_single_pk;

View File

@ -16,3 +16,5 @@ ${CLICKHOUSE_CLIENT} --query "SELECT 1" --user dns_fail_1 --host ${MYHOSTNAME}
${CLICKHOUSE_CLIENT} --query "SELECT 2" --user dns_fail_2 --host ${MYHOSTNAME}
${CLICKHOUSE_CLIENT} --query "DROP USER IF EXISTS dns_fail_1, dns_fail_2"
${CLICKHOUSE_CLIENT} --query "SYSTEM DROP DNS CACHE"

View File

@ -26,3 +26,11 @@ GRANT test_role_01999 TO test_user_01999
K
GRANT SHOW ON db8.* TO test_user_01999
L
GRANT SELECT ON db9.tb3 TO test_user_01999
M
GRANT SELECT ON db9.tb3 TO test_user_01999
GRANT test_role_01999 TO test_user_01999
N
GRANT SELECT ON db9.tb3 TO test_user_01999
GRANT test_role_01999_1 TO test_user_01999
O

View File

@ -54,7 +54,22 @@ SELECT 'K';
GRANT NONE TO test_user_01999 WITH REPLACE OPTION;
SHOW GRANTS FOR test_user_01999;
SELECT 'L';
GRANT NONE ON *.*, SELECT on db9.tb3 TO test_user_01999 WITH REPLACE OPTION;
SHOW GRANTS FOR test_user_01999;
SELECT 'M';
GRANT test_role_01999 to test_user_01999;
SHOW GRANTS FOR test_user_01999;
SELECT 'N';
DROP ROLE IF EXISTS test_role_01999_1;
CREATE role test_role_01999_1;
GRANT NONE, test_role_01999_1 TO test_user_01999 WITH REPLACE OPTION;
SHOW GRANTS FOR test_user_01999;
DROP USER IF EXISTS test_user_01999;
DROP ROLE IF EXISTS test_role_01999;
DROP ROLE IF EXISTS test_role_01999_1;
SELECT 'L';
SELECT 'O';

View File

@ -44,8 +44,8 @@ def error_window_function_in_where(self):
def error_window_function_in_join(self):
"""Check that trying to use window function in `JOIN` returns an error.
"""
exitcode = 48
message = "DB::Exception: JOIN ON inequalities are not supported. Unexpected 'row_number() OVER (ORDER BY salary ASC) < 10"
exitcode = 147
message = "DB::Exception: Cannot get JOIN keys from JOIN ON section: row_number() OVER (ORDER BY salary ASC) < 10"
sql = ("SELECT * FROM empsalary INNER JOIN tenk1 ON row_number() OVER (ORDER BY salary) < 10")