From d0412994a203099ab86f09db3c63ae4d6fe1cd89 Mon Sep 17 00:00:00 2001 From: Vitaly Baranov Date: Thu, 22 Jul 2021 16:32:51 +0300 Subject: [PATCH 01/19] Fix SET ROLE. --- src/Access/GrantedRoles.cpp | 4 ++-- tests/integration/test_role/test.py | 28 ++++++++++++++++++++++++++++ 2 files changed, 30 insertions(+), 2 deletions(-) diff --git a/src/Access/GrantedRoles.cpp b/src/Access/GrantedRoles.cpp index 2659f8a3ec9..f72b153de65 100644 --- a/src/Access/GrantedRoles.cpp +++ b/src/Access/GrantedRoles.cpp @@ -80,7 +80,7 @@ std::vector GrantedRoles::findGranted(const boost::container::flat_set res; res.reserve(ids.size()); - boost::range::set_difference(ids, roles, std::back_inserter(res)); + boost::range::set_intersection(ids, roles, std::back_inserter(res)); return res; } @@ -111,7 +111,7 @@ std::vector GrantedRoles::findGrantedWithAdminOption(const boost::containe { std::vector res; res.reserve(ids.size()); - boost::range::set_difference(ids, roles_with_admin_option, std::back_inserter(res)); + boost::range::set_intersection(ids, roles_with_admin_option, std::back_inserter(res)); return res; } diff --git a/tests/integration/test_role/test.py b/tests/integration/test_role/test.py index ccd3477ed72..d82e6a1e7b2 100644 --- a/tests/integration/test_role/test.py +++ b/tests/integration/test_role/test.py @@ -6,6 +6,13 @@ cluster = ClickHouseCluster(__file__) instance = cluster.add_instance('instance') +session_id_counter = 0 +def new_session_id(): + global session_id_counter + session_id_counter += 1 + return 'session #' + str(session_id_counter) + + @pytest.fixture(scope="module", autouse=True) def started_cluster(): try: @@ -138,6 +145,27 @@ def test_revoke_requires_admin_option(): assert instance.query("SHOW GRANTS FOR B") == "" +def test_set_role(): + instance.query("CREATE USER A") + instance.query("CREATE ROLE R1, R2") + instance.query("GRANT R1, R2 TO A") + + session_id = new_session_id() + assert instance.http_query('SHOW CURRENT ROLES', user='A', params={'session_id':session_id}) == TSV([["R1", 0, 1], ["R2", 0, 1]]) + + instance.http_query('SET ROLE R1', user='A', params={'session_id':session_id}) + assert instance.http_query('SHOW CURRENT ROLES', user='A', params={'session_id':session_id}) == TSV([["R1", 0, 1]]) + + instance.http_query('SET ROLE R2', user='A', params={'session_id':session_id}) + assert instance.http_query('SHOW CURRENT ROLES', user='A', params={'session_id':session_id}) == TSV([["R2", 0, 1]]) + + instance.http_query('SET ROLE NONE', user='A', params={'session_id':session_id}) + assert instance.http_query('SHOW CURRENT ROLES', user='A', params={'session_id':session_id}) == TSV([]) + + instance.http_query('SET ROLE DEFAULT', user='A', params={'session_id':session_id}) + assert instance.http_query('SHOW CURRENT ROLES', user='A', params={'session_id':session_id}) == TSV([["R1", 0, 1], ["R2", 0, 1]]) + + def test_introspection(): instance.query("CREATE USER A") instance.query("CREATE USER B") From fd754430ebec144c4199a34a0bcc7d1967327757 Mon Sep 17 00:00:00 2001 From: Nikolai Kochetov Date: Thu, 22 Jul 2021 19:05:52 +0300 Subject: [PATCH 02/19] Remove more streams. --- programs/obfuscator/Obfuscator.cpp | 41 ++- src/Access/ya.make | 1 + src/DataStreams/BlocksListBlockInputStream.h | 44 --- src/DataStreams/LimitBlockInputStream.cpp | 158 ---------- src/DataStreams/LimitBlockInputStream.h | 47 --- .../MergingSortedBlockInputStream.cpp | 273 ------------------ .../MergingSortedBlockInputStream.h | 87 ------ src/DataStreams/TemporaryFileStream.h | 33 ++- .../gtest_blocks_size_merging_streams.cpp | 7 +- .../tests/gtest_check_sorted_stream.cpp | 103 +++++-- src/DataStreams/ya.make | 3 - src/Interpreters/MergeJoin.cpp | 6 +- src/Interpreters/MutationsInterpreter.cpp | 14 +- src/Interpreters/SortedBlocksWriter.cpp | 135 ++++++--- src/Interpreters/SortedBlocksWriter.h | 7 +- src/Processors/Sources/BlocksListSource.h | 47 +++ .../Sources}/BlocksSource.h | 0 .../Transforms/CheckSortedTransform.cpp} | 37 ++- .../Transforms/CheckSortedTransform.h} | 15 +- src/Processors/ya.make | 1 + src/Storages/LiveView/StorageLiveView.cpp | 2 +- 21 files changed, 313 insertions(+), 748 deletions(-) delete mode 100644 src/DataStreams/BlocksListBlockInputStream.h delete mode 100644 src/DataStreams/LimitBlockInputStream.cpp delete mode 100644 src/DataStreams/LimitBlockInputStream.h delete mode 100644 src/DataStreams/MergingSortedBlockInputStream.cpp delete mode 100644 src/DataStreams/MergingSortedBlockInputStream.h create mode 100644 src/Processors/Sources/BlocksListSource.h rename src/{DataStreams => Processors/Sources}/BlocksSource.h (100%) rename src/{DataStreams/CheckSortedBlockInputStream.cpp => Processors/Transforms/CheckSortedTransform.cpp} (68%) rename src/{DataStreams/CheckSortedBlockInputStream.h => Processors/Transforms/CheckSortedTransform.h} (64%) diff --git a/programs/obfuscator/Obfuscator.cpp b/programs/obfuscator/Obfuscator.cpp index ecefcac1faf..b1acc34ef93 100644 --- a/programs/obfuscator/Obfuscator.cpp +++ b/programs/obfuscator/Obfuscator.cpp @@ -15,8 +15,8 @@ #include #include #include -#include -#include +#include +#include #include #include #include @@ -24,6 +24,10 @@ #include #include #include +#include +#include +#include +#include #include #include #include @@ -1156,17 +1160,20 @@ try if (!silent) std::cerr << "Training models\n"; - BlockInputStreamPtr input = context->getInputFormat(input_format, file_in, header, max_block_size); + Pipe pipe(FormatFactory::instance().getInput(input_format, file_in, header, context, max_block_size)); - input->readPrefix(); - while (Block block = input->read()) + QueryPipeline pipeline; + pipeline.init(std::move(pipe)); + PullingPipelineExecutor executor(pipeline); + + Block block; + while (executor.pull(block)) { obfuscator.train(block.getColumns()); source_rows += block.rows(); if (!silent) std::cerr << "Processed " << source_rows << " rows\n"; } - input->readSuffix(); } obfuscator.finalize(); @@ -1183,15 +1190,26 @@ try file_in.seek(0, SEEK_SET); - BlockInputStreamPtr input = context->getInputFormat(input_format, file_in, header, max_block_size); - BlockOutputStreamPtr output = context->getOutputStreamParallelIfPossible(output_format, file_out, header); + Pipe pipe(FormatFactory::instance().getInput(input_format, file_in, header, context, max_block_size)); if (processed_rows + source_rows > limit) - input = std::make_shared(input, limit - processed_rows, 0); + { + pipe.addSimpleTransform([&](const Block & cur_header) + { + return std::make_shared(cur_header, limit - processed_rows, 0); + }); + } + + QueryPipeline pipeline; + pipeline.init(std::move(pipe)); + + BlockOutputStreamPtr output = context->getOutputStreamParallelIfPossible(output_format, file_out, header); + + PullingPipelineExecutor executor(pipeline); - input->readPrefix(); output->writePrefix(); - while (Block block = input->read()) + Block block; + while (executor.pull(block)) { Columns columns = obfuscator.generate(block.getColumns()); output->write(header.cloneWithColumns(columns)); @@ -1200,7 +1218,6 @@ try std::cerr << "Processed " << processed_rows << " rows\n"; } output->writeSuffix(); - input->readSuffix(); obfuscator.updateSeed(); } diff --git a/src/Access/ya.make b/src/Access/ya.make index e8584230538..59fb504c6ab 100644 --- a/src/Access/ya.make +++ b/src/Access/ya.make @@ -45,6 +45,7 @@ SRCS( SettingsProfilesCache.cpp User.cpp UsersConfigAccessStorage.cpp + tests/gtest_access_rights_ops.cpp ) diff --git a/src/DataStreams/BlocksListBlockInputStream.h b/src/DataStreams/BlocksListBlockInputStream.h deleted file mode 100644 index de287c2dc5e..00000000000 --- a/src/DataStreams/BlocksListBlockInputStream.h +++ /dev/null @@ -1,44 +0,0 @@ -#pragma once - -#include - - -namespace DB -{ - -/** A stream of blocks from which you can read the next block from an explicitly provided list. - * Also see OneBlockInputStream. - */ -class BlocksListBlockInputStream : public IBlockInputStream -{ -public: - /// Acquires the ownership of the block list. - BlocksListBlockInputStream(BlocksList && list_) - : list(std::move(list_)), it(list.begin()), end(list.end()) {} - - /// Uses a list of blocks lying somewhere else. - BlocksListBlockInputStream(BlocksList::iterator & begin_, BlocksList::iterator & end_) - : it(begin_), end(end_) {} - - String getName() const override { return "BlocksList"; } - -protected: - Block getHeader() const override { return list.empty() ? Block() : *list.begin(); } - - Block readImpl() override - { - if (it == end) - return Block(); - - Block res = *it; - ++it; - return res; - } - -private: - BlocksList list; - BlocksList::iterator it; - const BlocksList::iterator end; -}; - -} diff --git a/src/DataStreams/LimitBlockInputStream.cpp b/src/DataStreams/LimitBlockInputStream.cpp deleted file mode 100644 index 5e262e921e8..00000000000 --- a/src/DataStreams/LimitBlockInputStream.cpp +++ /dev/null @@ -1,158 +0,0 @@ -#include - -#include - - -namespace DB -{ - -/// gets pointers to all columns of block, which were used for ORDER BY -static ColumnRawPtrs extractSortColumns(const Block & block, const SortDescription & description) -{ - size_t size = description.size(); - ColumnRawPtrs res; - res.reserve(size); - - for (size_t i = 0; i < size; ++i) - { - const IColumn * column = !description[i].column_name.empty() - ? block.getByName(description[i].column_name).column.get() - : block.safeGetByPosition(description[i].column_number).column.get(); - res.emplace_back(column); - } - - return res; -} - - -LimitBlockInputStream::LimitBlockInputStream( - const BlockInputStreamPtr & input, UInt64 limit_, UInt64 offset_, bool always_read_till_end_, - bool use_limit_as_total_rows_approx, bool with_ties_, const SortDescription & description_) - : limit(limit_), offset(offset_), always_read_till_end(always_read_till_end_), with_ties(with_ties_) - , description(description_) -{ - if (use_limit_as_total_rows_approx) - { - addTotalRowsApprox(static_cast(limit)); - } - - children.push_back(input); -} - -Block LimitBlockInputStream::readImpl() -{ - Block res; - UInt64 rows = 0; - - /// pos >= offset + limit and all rows in the end of previous block were equal - /// to row at 'limit' position. So we check current block. - if (!ties_row_ref.empty() && pos >= offset + limit) - { - res = children.back()->read(); - rows = res.rows(); - - if (!res) - return res; - - SharedBlockPtr ptr = new detail::SharedBlock(std::move(res)); - ptr->sort_columns = extractSortColumns(*ptr, description); - - UInt64 len; - for (len = 0; len < rows; ++len) - { - SharedBlockRowRef current_row; - current_row.set(ptr, &ptr->sort_columns, len); - - if (current_row != ties_row_ref) - { - ties_row_ref.reset(); - break; - } - } - - if (len < rows) - { - for (size_t i = 0; i < ptr->columns(); ++i) - ptr->safeGetByPosition(i).column = ptr->safeGetByPosition(i).column->cut(0, len); - } - - return *ptr; - } - - if (pos >= offset + limit) - { - if (!always_read_till_end) - return res; - else - { - while (children.back()->read()) - ; - return res; - } - } - - do - { - res = children.back()->read(); - if (!res) - return res; - rows = res.rows(); - pos += rows; - } while (pos <= offset); - - SharedBlockPtr ptr = new detail::SharedBlock(std::move(res)); - if (with_ties) - ptr->sort_columns = extractSortColumns(*ptr, description); - - /// give away the whole block - if (pos >= offset + rows && pos <= offset + limit) - { - /// Save rowref for last row, because probalbly next block begins with the same row. - if (with_ties && pos == offset + limit) - ties_row_ref.set(ptr, &ptr->sort_columns, rows - 1); - return *ptr; - } - - /// give away a piece of the block - UInt64 start = std::max( - static_cast(0), - static_cast(offset) - static_cast(pos) + static_cast(rows)); - - UInt64 length = std::min( - static_cast(limit), std::min( - static_cast(pos) - static_cast(offset), - static_cast(limit) + static_cast(offset) - static_cast(pos) + static_cast(rows))); - - - /// check if other rows in current block equals to last one in limit - if (with_ties) - { - ties_row_ref.set(ptr, &ptr->sort_columns, start + length - 1); - - for (size_t i = ties_row_ref.row_num + 1; i < rows; ++i) - { - SharedBlockRowRef current_row; - current_row.set(ptr, &ptr->sort_columns, i); - if (current_row == ties_row_ref) - ++length; - else - { - ties_row_ref.reset(); - break; - } - } - } - - if (length == rows) - return *ptr; - - for (size_t i = 0; i < ptr->columns(); ++i) - ptr->safeGetByPosition(i).column = ptr->safeGetByPosition(i).column->cut(start, length); - - // TODO: we should provide feedback to child-block, so it will know how many rows are actually consumed. - // It's crucial for streaming engines like Kafka. - - return *ptr; -} - -} diff --git a/src/DataStreams/LimitBlockInputStream.h b/src/DataStreams/LimitBlockInputStream.h deleted file mode 100644 index 112e5dddb0c..00000000000 --- a/src/DataStreams/LimitBlockInputStream.h +++ /dev/null @@ -1,47 +0,0 @@ -#pragma once - -#include -#include -#include - - -namespace DB -{ - - -/** Implements the LIMIT relational operation. - */ -class LimitBlockInputStream : public IBlockInputStream -{ -public: - /** If always_read_till_end = false (by default), then after reading enough data, - * returns an empty block, and this causes the query to be canceled. - * If always_read_till_end = true - reads all the data to the end, but ignores them. This is necessary in rare cases: - * when otherwise, due to the cancellation of the request, we would not have received the data for GROUP BY WITH TOTALS from the remote server. - * If use_limit_as_total_rows_approx = true, then addTotalRowsApprox is called to use the limit in progress & stats - * with_ties = true, when query has WITH TIES modifier. If so, description should be provided - * description lets us know which row we should check for equality - */ - LimitBlockInputStream( - const BlockInputStreamPtr & input, UInt64 limit_, UInt64 offset_, - bool always_read_till_end_ = false, bool use_limit_as_total_rows_approx = false, - bool with_ties_ = false, const SortDescription & description_ = {}); - - String getName() const override { return "Limit"; } - - Block getHeader() const override { return children.at(0)->getHeader(); } - -protected: - Block readImpl() override; - -private: - UInt64 limit; - UInt64 offset; - UInt64 pos = 0; - bool always_read_till_end; - bool with_ties; - const SortDescription description; - SharedBlockRowRef ties_row_ref; -}; - -} diff --git a/src/DataStreams/MergingSortedBlockInputStream.cpp b/src/DataStreams/MergingSortedBlockInputStream.cpp deleted file mode 100644 index b7396a23d6a..00000000000 --- a/src/DataStreams/MergingSortedBlockInputStream.cpp +++ /dev/null @@ -1,273 +0,0 @@ -#include - -#include - -#include -#include - - -namespace DB -{ - -namespace ErrorCodes -{ - extern const int LOGICAL_ERROR; -} - - -MergingSortedBlockInputStream::MergingSortedBlockInputStream( - const BlockInputStreams & inputs_, SortDescription description_, - size_t max_block_size_, UInt64 limit_, WriteBuffer * out_row_sources_buf_, bool quiet_) - : description(std::move(description_)), max_block_size(max_block_size_), limit(limit_), quiet(quiet_) - , source_blocks(inputs_.size()) - , cursors(inputs_.size()), out_row_sources_buf(out_row_sources_buf_) - , log(&Poco::Logger::get("MergingSortedBlockInputStream")) -{ - children.insert(children.end(), inputs_.begin(), inputs_.end()); - header = children.at(0)->getHeader(); - num_columns = header.columns(); -} - -void MergingSortedBlockInputStream::init(MutableColumns & merged_columns) -{ - /// Read the first blocks, initialize the queue. - if (first) - { - first = false; - - for (size_t i = 0; i < source_blocks.size(); ++i) - { - Block & block = source_blocks[i]; - - if (block) - continue; - - block = children[i]->read(); - - const size_t rows = block.rows(); - - if (rows == 0) - continue; - - if (expected_block_size < rows) - expected_block_size = std::min(rows, max_block_size); - - cursors[i] = SortCursorImpl(block, description, i); - has_collation |= cursors[i].has_collation; - } - - if (has_collation) - queue_with_collation = SortingHeap(cursors); - else - queue_without_collation = SortingHeap(cursors); - } - - /// Let's check that all source blocks have the same structure. - for (const auto & block : source_blocks) - { - if (!block) - continue; - - assertBlocksHaveEqualStructure(block, header, getName()); - } - - merged_columns.resize(num_columns); - for (size_t i = 0; i < num_columns; ++i) - { - merged_columns[i] = header.safeGetByPosition(i).column->cloneEmpty(); - merged_columns[i]->reserve(expected_block_size); - } -} - - -Block MergingSortedBlockInputStream::readImpl() -{ - if (finished) - return {}; - - if (children.size() == 1) - return children[0]->read(); - - MutableColumns merged_columns; - - init(merged_columns); - if (merged_columns.empty()) - return {}; - - if (has_collation) - merge(merged_columns, queue_with_collation); - else - merge(merged_columns, queue_without_collation); - - return header.cloneWithColumns(std::move(merged_columns)); -} - - -template -void MergingSortedBlockInputStream::fetchNextBlock(const TSortCursor & current, SortingHeap & queue) -{ - size_t order = current->order; - size_t size = cursors.size(); - - if (order >= size || &cursors[order] != current.impl) - throw Exception("Logical error in MergingSortedBlockInputStream", ErrorCodes::LOGICAL_ERROR); - - while (true) - { - source_blocks[order] = children[order]->read(); - - if (!source_blocks[order]) - { - queue.removeTop(); - break; - } - - if (source_blocks[order].rows()) - { - cursors[order].reset(source_blocks[order]); - queue.replaceTop(&cursors[order]); - break; - } - } -} - - -template -void MergingSortedBlockInputStream::merge(MutableColumns & merged_columns, TSortingHeap & queue) -{ - size_t merged_rows = 0; - - /** Increase row counters. - * Return true if it's time to finish generating the current data block. - */ - auto count_row_and_check_limit = [&, this]() - { - ++total_merged_rows; - if (limit && total_merged_rows == limit) - { - // std::cerr << "Limit reached\n"; - cancel(false); - finished = true; - return true; - } - - ++merged_rows; - return merged_rows >= max_block_size; - }; - - /// Take rows in required order and put them into `merged_columns`, while the number of rows are no more than `max_block_size` - while (queue.isValid()) - { - auto current = queue.current(); - - /** And what if the block is totally less or equal than the rest for the current cursor? - * Or is there only one data source left in the queue? Then you can take the entire block on current cursor. - */ - if (current->isFirst() - && (queue.size() == 1 - || (queue.size() >= 2 && current.totallyLessOrEquals(queue.nextChild())))) - { -// std::cerr << "current block is totally less or equals\n"; - - /// If there are already data in the current block, we first return it. We'll get here again the next time we call the merge function. - if (merged_rows != 0) - { - //std::cerr << "merged rows is non-zero\n"; - return; - } - - /// Actually, current->order stores source number (i.e. cursors[current->order] == current) - size_t source_num = current->order; - - if (source_num >= cursors.size()) - throw Exception("Logical error in MergingSortedBlockInputStream", ErrorCodes::LOGICAL_ERROR); - - for (size_t i = 0; i < num_columns; ++i) - merged_columns[i] = IColumn::mutate(std::move(source_blocks[source_num].getByPosition(i).column)); - -// std::cerr << "copied columns\n"; - - merged_rows = merged_columns.at(0)->size(); - - /// Limit output - if (limit && total_merged_rows + merged_rows > limit) - { - merged_rows = limit - total_merged_rows; - for (size_t i = 0; i < num_columns; ++i) - { - auto & column = merged_columns[i]; - column = IColumn::mutate(column->cut(0, merged_rows)); - } - - cancel(false); - finished = true; - } - - /// Write order of rows for other columns - /// this data will be used in grather stream - if (out_row_sources_buf) - { - RowSourcePart row_source(source_num); - for (size_t i = 0; i < merged_rows; ++i) - out_row_sources_buf->write(row_source.data); - } - - //std::cerr << "fetching next block\n"; - - total_merged_rows += merged_rows; - fetchNextBlock(current, queue); - return; - } - -// std::cerr << "total_merged_rows: " << total_merged_rows << ", merged_rows: " << merged_rows << "\n"; -// std::cerr << "Inserting row\n"; - for (size_t i = 0; i < num_columns; ++i) - merged_columns[i]->insertFrom(*current->all_columns[i], current->getRow()); - - if (out_row_sources_buf) - { - /// Actually, current.impl->order stores source number (i.e. cursors[current.impl->order] == current.impl) - RowSourcePart row_source(current->order); - out_row_sources_buf->write(row_source.data); - } - - if (!current->isLast()) - { -// std::cerr << "moving to next row\n"; - queue.next(); - } - else - { - /// We get the next block from the corresponding source, if there is one. -// std::cerr << "It was last row, fetching next block\n"; - fetchNextBlock(current, queue); - } - - if (count_row_and_check_limit()) - return; - } - - /// We have read all data. Ask children to cancel providing more data. - cancel(false); - finished = true; -} - - -void MergingSortedBlockInputStream::readSuffixImpl() -{ - if (quiet) - return; - - const BlockStreamProfileInfo & profile_info = getProfileInfo(); - double seconds = profile_info.total_stopwatch.elapsedSeconds(); - - if (!seconds) - LOG_DEBUG(log, "Merge sorted {} blocks, {} rows in 0 sec.", profile_info.blocks, profile_info.rows); - else - LOG_DEBUG(log, "Merge sorted {} blocks, {} rows in {} sec., {} rows/sec., {}/sec", - profile_info.blocks, profile_info.rows, seconds, - profile_info.rows / seconds, - ReadableSize(profile_info.bytes / seconds)); -} - -} diff --git a/src/DataStreams/MergingSortedBlockInputStream.h b/src/DataStreams/MergingSortedBlockInputStream.h deleted file mode 100644 index 582b41ff3af..00000000000 --- a/src/DataStreams/MergingSortedBlockInputStream.h +++ /dev/null @@ -1,87 +0,0 @@ -#pragma once - -#include -#include - -#include - -#include - - -namespace Poco { class Logger; } - - -namespace DB -{ - -/** Merges several sorted streams into one sorted stream. - */ -class MergingSortedBlockInputStream : public IBlockInputStream -{ -public: - /** limit - if isn't 0, then we can produce only first limit rows in sorted order. - * out_row_sources - if isn't nullptr, then at the end of execution it should contain part numbers of each read row (and needed flag) - * quiet - don't log profiling info - */ - MergingSortedBlockInputStream( - const BlockInputStreams & inputs_, SortDescription description_, size_t max_block_size_, - UInt64 limit_ = 0, WriteBuffer * out_row_sources_buf_ = nullptr, bool quiet_ = false); - - String getName() const override { return "MergingSorted"; } - - Block getHeader() const override { return header; } - -protected: - Block readImpl() override; - - void readSuffixImpl() override; - - /// Initializes the queue and the columns of next result block. - void init(MutableColumns & merged_columns); - - /// Gets the next block from the source corresponding to the `current`. - template - void fetchNextBlock(const TSortCursor & current, SortingHeap & queue); - - Block header; - - const SortDescription description; - const size_t max_block_size; - UInt64 limit; - UInt64 total_merged_rows = 0; - - bool first = true; - bool has_collation = false; - bool quiet = false; - - /// May be smaller or equal to max_block_size. To do 'reserve' for columns. - size_t expected_block_size = 0; - - /// Blocks currently being merged. - size_t num_columns = 0; - Blocks source_blocks; - - SortCursorImpls cursors; - - SortingHeap queue_without_collation; - SortingHeap queue_with_collation; - - /// Used in Vertical merge algorithm to gather non-PK/non-index columns (on next step) - /// If it is not nullptr then it should be populated during execution - WriteBuffer * out_row_sources_buf; - -private: - - /** We support two different cursors - with Collation and without. - * Templates are used instead of polymorphic SortCursor and calls to virtual functions. - */ - template - void merge(MutableColumns & merged_columns, TSortingHeap & queue); - - Poco::Logger * log; - - /// Read is finished. - bool finished = false; -}; - -} diff --git a/src/DataStreams/TemporaryFileStream.h b/src/DataStreams/TemporaryFileStream.h index ce9071801d0..ec38f6c1baa 100644 --- a/src/DataStreams/TemporaryFileStream.h +++ b/src/DataStreams/TemporaryFileStream.h @@ -4,6 +4,9 @@ #include #include #include +#include +#include +#include #include #include #include @@ -32,32 +35,38 @@ struct TemporaryFileStream {} /// Flush data from input stream into file for future reading - static void write(const std::string & path, const Block & header, IBlockInputStream & input, - std::atomic * is_cancelled, const std::string & codec) + static void write(const std::string & path, const Block & header, QueryPipeline pipeline, const std::string & codec) { WriteBufferFromFile file_buf(path); CompressedWriteBuffer compressed_buf(file_buf, CompressionCodecFactory::instance().get(codec, {})); NativeBlockOutputStream output(compressed_buf, 0, header); - copyData(input, output, is_cancelled); + + PullingPipelineExecutor executor(pipeline); + + output.writePrefix(); + + Block block; + while (executor.pull(block)) + output.write(block); + + output.writeSuffix(); compressed_buf.finalize(); } }; -class TemporaryFileLazyInputStream : public IBlockInputStream +class TemporaryFileLazySource : public ISource { public: - TemporaryFileLazyInputStream(const std::string & path_, const Block & header_) - : path(path_) - , header(header_) + TemporaryFileLazySource(const std::string & path_, const Block & header_) + : ISource(header_) + , path(path_) , done(false) {} - String getName() const override { return "TemporaryFile"; } - Block getHeader() const override { return header; } - void readSuffix() override {} + String getName() const override { return "TemporaryFileLazySource"; } protected: - Block readImpl() override + Chunk generate() override { if (done) return {}; @@ -71,7 +80,7 @@ protected: done = true; stream.reset(); } - return block; + return Chunk(block.getColumns(), block.rows()); } private: diff --git a/src/DataStreams/tests/gtest_blocks_size_merging_streams.cpp b/src/DataStreams/tests/gtest_blocks_size_merging_streams.cpp index 0ce450c4e6c..ed1ac6adbff 100644 --- a/src/DataStreams/tests/gtest_blocks_size_merging_streams.cpp +++ b/src/DataStreams/tests/gtest_blocks_size_merging_streams.cpp @@ -1,11 +1,10 @@ #include #include #include -#include +#include #include #include #include -#include #include #include #include @@ -40,7 +39,7 @@ static Pipe getInputStreams(const std::vector & column_names, const size_t start = stride; while (blocks_count--) blocks.push_back(getBlockWithSize(column_names, block_size_in_bytes, stride, start)); - pipes.emplace_back(std::make_shared(std::make_shared(std::move(blocks)))); + pipes.emplace_back(std::make_shared(std::move(blocks))); } return Pipe::unitePipes(std::move(pipes)); @@ -57,7 +56,7 @@ static Pipe getInputStreamsEqualStride(const std::vector & column_n size_t start = i; while (blocks_count--) blocks.push_back(getBlockWithSize(column_names, block_size_in_bytes, stride, start)); - pipes.emplace_back(std::make_shared(std::make_shared(std::move(blocks)))); + pipes.emplace_back(std::make_shared(std::move(blocks))); i++; } return Pipe::unitePipes(std::move(pipes)); diff --git a/src/DataStreams/tests/gtest_check_sorted_stream.cpp b/src/DataStreams/tests/gtest_check_sorted_stream.cpp index 0c5cc6d58e1..72de5008e0b 100644 --- a/src/DataStreams/tests/gtest_check_sorted_stream.cpp +++ b/src/DataStreams/tests/gtest_check_sorted_stream.cpp @@ -2,8 +2,10 @@ #include #include -#include -#include +#include +#include +#include +#include #include @@ -89,14 +91,22 @@ TEST(CheckSortedBlockInputStream, CheckGoodCase) for (size_t i = 0; i < 3; ++i) blocks.push_back(getSortedBlockWithSize(key_columns, 10, 1, i * 10)); - BlockInputStreamPtr stream = std::make_shared(std::move(blocks)); + Pipe pipe(std::make_shared(std::move(blocks))); + pipe.addSimpleTransform([&](const Block & header) + { + return std::make_shared(header, sort_description); + }); - CheckSortedBlockInputStream sorted(stream, sort_description); + QueryPipeline pipeline; + pipeline.init(std::move(pipe)); - EXPECT_NO_THROW(sorted.read()); - EXPECT_NO_THROW(sorted.read()); - EXPECT_NO_THROW(sorted.read()); - EXPECT_EQ(sorted.read(), Block()); + PullingPipelineExecutor executor(pipeline); + + Chunk chunk; + EXPECT_NO_THROW(executor.pull(chunk)); + EXPECT_NO_THROW(executor.pull(chunk)); + EXPECT_NO_THROW(executor.pull(chunk)); + EXPECT_TRUE(executor.pull(chunk)); } TEST(CheckSortedBlockInputStream, CheckBadLastRow) @@ -109,14 +119,21 @@ TEST(CheckSortedBlockInputStream, CheckBadLastRow) blocks.push_back(getSortedBlockWithSize(key_columns, 100, 1, 0)); blocks.push_back(getSortedBlockWithSize(key_columns, 100, 1, 300)); - BlockInputStreamPtr stream = std::make_shared(std::move(blocks)); + Pipe pipe(std::make_shared(std::move(blocks))); + pipe.addSimpleTransform([&](const Block & header) + { + return std::make_shared(header, sort_description); + }); - CheckSortedBlockInputStream sorted(stream, sort_description); + QueryPipeline pipeline; + pipeline.init(std::move(pipe)); + PullingPipelineExecutor executor(pipeline); - EXPECT_NO_THROW(sorted.read()); - EXPECT_NO_THROW(sorted.read()); - EXPECT_THROW(sorted.read(), DB::Exception); + Chunk chunk; + EXPECT_NO_THROW(executor.pull(chunk)); + EXPECT_NO_THROW(executor.pull(chunk)); + EXPECT_THROW(executor.pull(chunk), DB::Exception); } @@ -127,11 +144,19 @@ TEST(CheckSortedBlockInputStream, CheckUnsortedBlock1) BlocksList blocks; blocks.push_back(getUnSortedBlockWithSize(key_columns, 100, 1, 0, 5, 1, 77)); - BlockInputStreamPtr stream = std::make_shared(std::move(blocks)); + Pipe pipe(std::make_shared(std::move(blocks))); + pipe.addSimpleTransform([&](const Block & header) + { + return std::make_shared(header, sort_description); + }); - CheckSortedBlockInputStream sorted(stream, sort_description); + QueryPipeline pipeline; + pipeline.init(std::move(pipe)); - EXPECT_THROW(sorted.read(), DB::Exception); + PullingPipelineExecutor executor(pipeline); + + Chunk chunk; + EXPECT_THROW(executor.pull(chunk), DB::Exception); } TEST(CheckSortedBlockInputStream, CheckUnsortedBlock2) @@ -141,11 +166,19 @@ TEST(CheckSortedBlockInputStream, CheckUnsortedBlock2) BlocksList blocks; blocks.push_back(getUnSortedBlockWithSize(key_columns, 100, 1, 0, 99, 2, 77)); - BlockInputStreamPtr stream = std::make_shared(std::move(blocks)); + Pipe pipe(std::make_shared(std::move(blocks))); + pipe.addSimpleTransform([&](const Block & header) + { + return std::make_shared(header, sort_description); + }); - CheckSortedBlockInputStream sorted(stream, sort_description); + QueryPipeline pipeline; + pipeline.init(std::move(pipe)); - EXPECT_THROW(sorted.read(), DB::Exception); + PullingPipelineExecutor executor(pipeline); + + Chunk chunk; + EXPECT_THROW(executor.pull(chunk), DB::Exception); } TEST(CheckSortedBlockInputStream, CheckUnsortedBlock3) @@ -155,11 +188,19 @@ TEST(CheckSortedBlockInputStream, CheckUnsortedBlock3) BlocksList blocks; blocks.push_back(getUnSortedBlockWithSize(key_columns, 100, 1, 0, 50, 0, 77)); - BlockInputStreamPtr stream = std::make_shared(std::move(blocks)); + Pipe pipe(std::make_shared(std::move(blocks))); + pipe.addSimpleTransform([&](const Block & header) + { + return std::make_shared(header, sort_description); + }); - CheckSortedBlockInputStream sorted(stream, sort_description); + QueryPipeline pipeline; + pipeline.init(std::move(pipe)); - EXPECT_THROW(sorted.read(), DB::Exception); + PullingPipelineExecutor executor(pipeline); + + Chunk chunk; + EXPECT_THROW(executor.pull(chunk), DB::Exception); } TEST(CheckSortedBlockInputStream, CheckEqualBlock) @@ -171,11 +212,19 @@ TEST(CheckSortedBlockInputStream, CheckEqualBlock) blocks.push_back(getEqualValuesBlockWithSize(key_columns, 10)); blocks.push_back(getEqualValuesBlockWithSize(key_columns, 1)); - BlockInputStreamPtr stream = std::make_shared(std::move(blocks)); + Pipe pipe(std::make_shared(std::move(blocks))); + pipe.addSimpleTransform([&](const Block & header) + { + return std::make_shared(header, sort_description); + }); - CheckSortedBlockInputStream sorted(stream, sort_description); + QueryPipeline pipeline; + pipeline.init(std::move(pipe)); - EXPECT_NO_THROW(sorted.read()); - EXPECT_NO_THROW(sorted.read()); - EXPECT_NO_THROW(sorted.read()); + PullingPipelineExecutor executor(pipeline); + + Chunk chunk; + EXPECT_NO_THROW(executor.pull(chunk)); + EXPECT_NO_THROW(executor.pull(chunk)); + EXPECT_NO_THROW(executor.pull(chunk)); } diff --git a/src/DataStreams/ya.make b/src/DataStreams/ya.make index e6534ebc2f7..5c4910eff3d 100644 --- a/src/DataStreams/ya.make +++ b/src/DataStreams/ya.make @@ -19,7 +19,6 @@ SRCS( BlockIO.cpp BlockStreamProfileInfo.cpp CheckConstraintsBlockOutputStream.cpp - CheckSortedBlockInputStream.cpp ColumnGathererStream.cpp ConvertingBlockInputStream.cpp CountingBlockOutputStream.cpp @@ -30,9 +29,7 @@ SRCS( ITTLAlgorithm.cpp InputStreamFromASTInsertQuery.cpp InternalTextLogsRowOutputStream.cpp - LimitBlockInputStream.cpp MaterializingBlockInputStream.cpp - MergingSortedBlockInputStream.cpp MongoDBBlockInputStream.cpp NativeBlockInputStream.cpp NativeBlockOutputStream.cpp diff --git a/src/Interpreters/MergeJoin.cpp b/src/Interpreters/MergeJoin.cpp index 8f9d94b6079..df9e81f5e18 100644 --- a/src/Interpreters/MergeJoin.cpp +++ b/src/Interpreters/MergeJoin.cpp @@ -9,11 +9,10 @@ #include #include #include -#include +#include #include #include #include -#include namespace DB @@ -518,8 +517,7 @@ void MergeJoin::mergeInMemoryRightBlocks() if (right_blocks.empty()) return; - auto stream = std::make_shared(std::move(right_blocks.blocks)); - Pipe source(std::make_shared(std::move(stream))); + Pipe source(std::make_shared(std::move(right_blocks.blocks))); right_blocks.clear(); QueryPipeline pipeline; diff --git a/src/Interpreters/MutationsInterpreter.cpp b/src/Interpreters/MutationsInterpreter.cpp index 03a2a4da1d1..fe0594bb58f 100644 --- a/src/Interpreters/MutationsInterpreter.cpp +++ b/src/Interpreters/MutationsInterpreter.cpp @@ -17,7 +17,7 @@ #include #include #include -#include +#include #include #include #include @@ -901,12 +901,18 @@ BlockInputStreamPtr MutationsInterpreter::execute() select_interpreter->buildQueryPlan(plan); auto pipeline = addStreamsForLaterStages(stages, plan); - BlockInputStreamPtr result_stream = std::make_shared(std::move(*pipeline)); /// Sometimes we update just part of columns (for example UPDATE mutation) /// in this case we don't read sorting key, so just we don't check anything. - if (auto sort_desc = getStorageSortDescriptionIfPossible(result_stream->getHeader())) - result_stream = std::make_shared(result_stream, *sort_desc); + if (auto sort_desc = getStorageSortDescriptionIfPossible(pipeline->getHeader())) + { + pipeline->addSimpleTransform([&](const Block & header) + { + return std::make_shared(header, *sort_desc); + }); + } + + BlockInputStreamPtr result_stream = std::make_shared(std::move(*pipeline)); if (!updated_header) updated_header = std::make_unique(result_stream->getHeader()); diff --git a/src/Interpreters/SortedBlocksWriter.cpp b/src/Interpreters/SortedBlocksWriter.cpp index e56c355852f..281f9020a91 100644 --- a/src/Interpreters/SortedBlocksWriter.cpp +++ b/src/Interpreters/SortedBlocksWriter.cpp @@ -1,7 +1,9 @@ #include #include -#include -#include +#include +#include +#include +#include #include #include #include @@ -18,32 +20,33 @@ namespace ErrorCodes namespace { -std::unique_ptr flushToFile(const String & tmp_path, const Block & header, IBlockInputStream & stream, const String & codec) +std::unique_ptr flushToFile(const String & tmp_path, const Block & header, QueryPipeline pipeline, const String & codec) { auto tmp_file = createTemporaryFile(tmp_path); - std::atomic is_cancelled{false}; - TemporaryFileStream::write(tmp_file->path(), header, stream, &is_cancelled, codec); - if (is_cancelled) - throw Exception("Cannot flush MergeJoin data on disk. No space at " + tmp_path, ErrorCodes::NOT_ENOUGH_SPACE); + TemporaryFileStream::write(tmp_file->path(), header, std::move(pipeline), codec); return tmp_file; } -SortedBlocksWriter::SortedFiles flushToManyFiles(const String & tmp_path, const Block & header, IBlockInputStream & stream, +SortedBlocksWriter::SortedFiles flushToManyFiles(const String & tmp_path, const Block & header, QueryPipeline pipeline, const String & codec, std::function callback = [](const Block &){}) { std::vector> files; + PullingPipelineExecutor executor(pipeline); - while (Block block = stream.read()) + Block block; + while (executor.pull(block)) { if (!block.rows()) continue; callback(block); - OneBlockInputStream block_stream(block); - auto tmp_file = flushToFile(tmp_path, header, block_stream, codec); + QueryPipeline one_block_pipeline; + Chunk chunk(block.getColumns(), block.rows()); + one_block_pipeline.init(Pipe(std::make_shared(block.cloneEmpty(), std::move(chunk)))); + auto tmp_file = flushToFile(tmp_path, header, std::move(one_block_pipeline), codec); files.emplace_back(std::move(tmp_file)); } @@ -119,23 +122,30 @@ SortedBlocksWriter::TmpFilePtr SortedBlocksWriter::flush(const BlocksList & bloc { const std::string path = getPath(); - if (blocks.empty()) + Pipes pipes; + pipes.reserve(blocks.size()); + for (const auto & block : blocks) + if (auto num_rows = block.rows()) + pipes.emplace_back(std::make_shared(block.cloneEmpty(), Chunk(block.getColumns(), num_rows))); + + if (pipes.empty()) return {}; - if (blocks.size() == 1) + QueryPipeline pipeline; + pipeline.init(Pipe::unitePipes(std::move(pipes))); + + if (pipeline.getNumStreams() > 1) { - OneBlockInputStream sorted_input(blocks.front()); - return flushToFile(path, sample_block, sorted_input, codec); + auto transform = std::make_shared( + pipeline.getHeader(), + pipeline.getNumStreams(), + sort_description, + rows_in_block); + + pipeline.addTransform(std::move(transform)); } - BlockInputStreams inputs; - inputs.reserve(blocks.size()); - for (const auto & block : blocks) - if (block.rows()) - inputs.push_back(std::make_shared(block)); - - MergingSortedBlockInputStream sorted_input(inputs, sort_description, rows_in_block); - return flushToFile(path, sample_block, sorted_input, codec); + return flushToFile(path, sample_block, std::move(pipeline), codec); } SortedBlocksWriter::PremergedFiles SortedBlocksWriter::premerge() @@ -158,8 +168,8 @@ SortedBlocksWriter::PremergedFiles SortedBlocksWriter::premerge() if (!blocks.empty()) files.emplace_back(flush(blocks)); - BlockInputStreams inputs; - inputs.reserve(num_files_for_merge); + Pipes pipes; + pipes.reserve(num_files_for_merge); /// Merge by parts to save memory. It's possible to exchange disk I/O and memory by num_files_for_merge. { @@ -170,13 +180,26 @@ SortedBlocksWriter::PremergedFiles SortedBlocksWriter::premerge() { for (const auto & file : files) { - inputs.emplace_back(streamFromFile(file)); + pipes.emplace_back(streamFromFile(file)); - if (inputs.size() == num_files_for_merge || &file == &files.back()) + if (pipes.size() == num_files_for_merge || &file == &files.back()) { - MergingSortedBlockInputStream sorted_input(inputs, sort_description, rows_in_block); - new_files.emplace_back(flushToFile(getPath(), sample_block, sorted_input, codec)); - inputs.clear(); + QueryPipeline pipeline; + pipeline.init(Pipe::unitePipes(std::move(pipes))); + pipes = Pipes(); + + if (pipeline.getNumStreams() > 1) + { + auto transform = std::make_shared( + pipeline.getHeader(), + pipeline.getNumStreams(), + sort_description, + rows_in_block); + + pipeline.addTransform(std::move(transform)); + } + + new_files.emplace_back(flushToFile(getPath(), sample_block, std::move(pipeline), codec)); } } @@ -185,22 +208,35 @@ SortedBlocksWriter::PremergedFiles SortedBlocksWriter::premerge() } for (const auto & file : files) - inputs.emplace_back(streamFromFile(file)); + pipes.emplace_back(streamFromFile(file)); } - return PremergedFiles{std::move(files), std::move(inputs)}; + return PremergedFiles{std::move(files), Pipe::unitePipes(std::move(pipes))}; } SortedBlocksWriter::SortedFiles SortedBlocksWriter::finishMerge(std::function callback) { PremergedFiles files = premerge(); - MergingSortedBlockInputStream sorted_input(files.streams, sort_description, rows_in_block); - return flushToManyFiles(getPath(), sample_block, sorted_input, codec, callback); + QueryPipeline pipeline; + pipeline.init(std::move(files.pipe)); + + if (pipeline.getNumStreams() > 1) + { + auto transform = std::make_shared( + pipeline.getHeader(), + pipeline.getNumStreams(), + sort_description, + rows_in_block); + + pipeline.addTransform(std::move(transform)); + } + + return flushToManyFiles(getPath(), sample_block, std::move(pipeline), codec, callback); } -BlockInputStreamPtr SortedBlocksWriter::streamFromFile(const TmpFilePtr & file) const +Pipe SortedBlocksWriter::streamFromFile(const TmpFilePtr & file) const { - return std::make_shared(file->path(), materializeBlock(sample_block)); + return Pipe(std::make_shared(file->path(), materializeBlock(sample_block))); } String SortedBlocksWriter::getPath() const @@ -250,18 +286,35 @@ Block SortedBlocksBuffer::mergeBlocks(Blocks && blocks) const size_t num_rows = 0; { /// Merge sort blocks - BlockInputStreams inputs; - inputs.reserve(blocks.size()); + Pipes pipes; + pipes.reserve(blocks.size()); for (auto & block : blocks) { num_rows += block.rows(); - inputs.emplace_back(std::make_shared(block)); + Chunk chunk(block.getColumns(), block.rows()); + pipes.emplace_back(std::make_shared(block.cloneEmpty(), std::move(chunk))); } Blocks tmp_blocks; - MergingSortedBlockInputStream stream(inputs, sort_description, num_rows); - while (const auto & block = stream.read()) + + QueryPipeline pipeline; + pipeline.init(Pipe::unitePipes(std::move(pipes))); + + if (pipeline.getNumStreams() > 1) + { + auto transform = std::make_shared( + pipeline.getHeader(), + pipeline.getNumStreams(), + sort_description, + num_rows); + + pipeline.addTransform(std::move(transform)); + } + + PullingPipelineExecutor executor(pipeline); + Block block; + while (executor.pull(block)) tmp_blocks.emplace_back(block); blocks.swap(tmp_blocks); diff --git a/src/Interpreters/SortedBlocksWriter.h b/src/Interpreters/SortedBlocksWriter.h index b0488ec90c9..c65511e943e 100644 --- a/src/Interpreters/SortedBlocksWriter.h +++ b/src/Interpreters/SortedBlocksWriter.h @@ -6,6 +6,7 @@ #include #include #include +#include #include #include @@ -17,6 +18,8 @@ class TableJoin; class MergeJoinCursor; struct MergeJoinEqualRange; +class Pipe; + class IVolume; using VolumePtr = std::shared_ptr; @@ -56,7 +59,7 @@ struct SortedBlocksWriter struct PremergedFiles { SortedFiles files; - BlockInputStreams streams; + Pipe pipe; }; static constexpr const size_t num_streams = 2; @@ -94,7 +97,7 @@ struct SortedBlocksWriter } String getPath() const; - BlockInputStreamPtr streamFromFile(const TmpFilePtr & file) const; + Pipe streamFromFile(const TmpFilePtr & file) const; void insert(Block && block); TmpFilePtr flush(const BlocksList & blocks) const; diff --git a/src/Processors/Sources/BlocksListSource.h b/src/Processors/Sources/BlocksListSource.h new file mode 100644 index 00000000000..e0388214f3e --- /dev/null +++ b/src/Processors/Sources/BlocksListSource.h @@ -0,0 +1,47 @@ +#pragma once + +#include + + +namespace DB +{ + +/** A stream of blocks from which you can read the next block from an explicitly provided list. + * Also see OneBlockInputStream. + */ +class BlocksListSource : public SourceWithProgress +{ +public: + /// Acquires the ownership of the block list. + explicit BlocksListSource(BlocksList && list_) + : SourceWithProgress(list_.empty() ? Block() : list_.front().cloneEmpty()) + , list(std::move(list_)), it(list.begin()), end(list.end()) {} + + /// Uses a list of blocks lying somewhere else. + BlocksListSource(BlocksList::iterator & begin_, BlocksList::iterator & end_) + : SourceWithProgress(begin_ == end_ ? Block() : begin_->cloneEmpty()) + , it(begin_), end(end_) {} + + String getName() const override { return "BlocksListSource"; } + +protected: + + Chunk generate() override + { + if (it == end) + return {}; + + Block res = *it; + ++it; + + size_t num_rows = res.rows(); + return Chunk(res.getColumns(), num_rows); + } + +private: + BlocksList list; + BlocksList::iterator it; + const BlocksList::iterator end; +}; + +} diff --git a/src/DataStreams/BlocksSource.h b/src/Processors/Sources/BlocksSource.h similarity index 100% rename from src/DataStreams/BlocksSource.h rename to src/Processors/Sources/BlocksSource.h diff --git a/src/DataStreams/CheckSortedBlockInputStream.cpp b/src/Processors/Transforms/CheckSortedTransform.cpp similarity index 68% rename from src/DataStreams/CheckSortedBlockInputStream.cpp rename to src/Processors/Transforms/CheckSortedTransform.cpp index 064c1b690b8..3d4518a935d 100644 --- a/src/DataStreams/CheckSortedBlockInputStream.cpp +++ b/src/Processors/Transforms/CheckSortedTransform.cpp @@ -1,4 +1,4 @@ -#include +#include #include #include #include @@ -12,20 +12,20 @@ namespace ErrorCodes extern const int LOGICAL_ERROR; } -CheckSortedBlockInputStream::CheckSortedBlockInputStream( - const BlockInputStreamPtr & input_, +CheckSortedTransform::CheckSortedTransform( + const Block & header_, const SortDescription & sort_description_) - : header(input_->getHeader()) + : ISimpleTransform(header_, header_, false) , sort_description_map(addPositionsToSortDescriptions(sort_description_)) { - children.push_back(input_); } SortDescriptionsWithPositions -CheckSortedBlockInputStream::addPositionsToSortDescriptions(const SortDescription & sort_description) +CheckSortedTransform::addPositionsToSortDescriptions(const SortDescription & sort_description) { SortDescriptionsWithPositions result; result.reserve(sort_description.size()); + const auto & header = getInputPort().getHeader(); for (SortColumnDescription description_copy : sort_description) { @@ -39,11 +39,11 @@ CheckSortedBlockInputStream::addPositionsToSortDescriptions(const SortDescriptio } -Block CheckSortedBlockInputStream::readImpl() +void CheckSortedTransform::transform(Chunk & chunk) { - Block block = children.back()->read(); - if (!block || block.rows() == 0) - return block; + size_t num_rows = chunk.getNumRows(); + if (num_rows == 0) + return; auto check = [this](const Columns & left, size_t left_index, const Columns & right, size_t right_index) { @@ -70,23 +70,20 @@ Block CheckSortedBlockInputStream::readImpl() } }; - auto block_columns = block.getColumns(); + const auto & chunk_columns = chunk.getColumns(); if (!last_row.empty()) - check(last_row, 0, block_columns, 0); + check(last_row, 0, chunk_columns, 0); - size_t rows = block.rows(); - for (size_t i = 1; i < rows; ++i) - check(block_columns, i - 1, block_columns, i); + for (size_t i = 1; i < num_rows; ++i) + check(chunk_columns, i - 1, chunk_columns, i); last_row.clear(); - for (size_t i = 0; i < block.columns(); ++i) + for (const auto & chunk_column : chunk_columns) { - auto column = block_columns[i]->cloneEmpty(); - column->insertFrom(*block_columns[i], rows - 1); + auto column = chunk_column->cloneEmpty(); + column->insertFrom(*chunk_column, num_rows - 1); last_row.emplace_back(std::move(column)); } - - return block; } } diff --git a/src/DataStreams/CheckSortedBlockInputStream.h b/src/Processors/Transforms/CheckSortedTransform.h similarity index 64% rename from src/DataStreams/CheckSortedBlockInputStream.h rename to src/Processors/Transforms/CheckSortedTransform.h index 42060befeeb..d1b13d22578 100644 --- a/src/DataStreams/CheckSortedBlockInputStream.h +++ b/src/Processors/Transforms/CheckSortedTransform.h @@ -1,5 +1,5 @@ #pragma once -#include +#include #include #include @@ -9,26 +9,23 @@ using SortDescriptionsWithPositions = std::vector; /// Streams checks that flow of blocks is sorted in the sort_description order /// Othrewise throws exception in readImpl function. -class CheckSortedBlockInputStream : public IBlockInputStream +class CheckSortedTransform : public ISimpleTransform { public: - CheckSortedBlockInputStream( - const BlockInputStreamPtr & input_, + CheckSortedTransform( + const Block & header_, const SortDescription & sort_description_); - String getName() const override { return "CheckingSorted"; } + String getName() const override { return "CheckSortedTransform"; } - Block getHeader() const override { return header; } protected: - Block readImpl() override; + void transform(Chunk & chunk) override; private: - Block header; SortDescriptionsWithPositions sort_description_map; Columns last_row; -private: /// Just checks, that all sort_descriptions has column_number SortDescriptionsWithPositions addPositionsToSortDescriptions(const SortDescription & sort_description); }; diff --git a/src/Processors/ya.make b/src/Processors/ya.make index 3e51d9a77af..68cfe4e5011 100644 --- a/src/Processors/ya.make +++ b/src/Processors/ya.make @@ -143,6 +143,7 @@ SRCS( Transforms/AggregatingInOrderTransform.cpp Transforms/AggregatingTransform.cpp Transforms/ArrayJoinTransform.cpp + Transforms/CheckSortedTransform.cpp Transforms/CopyTransform.cpp Transforms/CreatingSetsTransform.cpp Transforms/CubeTransform.cpp diff --git a/src/Storages/LiveView/StorageLiveView.cpp b/src/Storages/LiveView/StorageLiveView.cpp index f54abda6d7f..5f5ce8a4a37 100644 --- a/src/Storages/LiveView/StorageLiveView.cpp +++ b/src/Storages/LiveView/StorageLiveView.cpp @@ -16,7 +16,7 @@ limitations under the License. */ #include #include #include -#include +#include #include #include #include From cb50fd9521a53b3a73b5b0d43e2804f86b0d6138 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Ra=C3=BAl=20Mar=C3=ADn?= Date: Thu, 22 Jul 2021 18:21:17 +0200 Subject: [PATCH 03/19] 01946_test_wrong_host_name_access: Clear DNS in the end Leaves a better env and avoids future errors in the logs --- tests/queries/0_stateless/01946_test_wrong_host_name_access.sh | 2 ++ 1 file changed, 2 insertions(+) diff --git a/tests/queries/0_stateless/01946_test_wrong_host_name_access.sh b/tests/queries/0_stateless/01946_test_wrong_host_name_access.sh index 288a3438dc9..86e1fdd768f 100755 --- a/tests/queries/0_stateless/01946_test_wrong_host_name_access.sh +++ b/tests/queries/0_stateless/01946_test_wrong_host_name_access.sh @@ -16,3 +16,5 @@ ${CLICKHOUSE_CLIENT} --query "SELECT 1" --user dns_fail_1 --host ${MYHOSTNAME} ${CLICKHOUSE_CLIENT} --query "SELECT 2" --user dns_fail_2 --host ${MYHOSTNAME} ${CLICKHOUSE_CLIENT} --query "DROP USER IF EXISTS dns_fail_1, dns_fail_2" + +${CLICKHOUSE_CLIENT} --query "SYSTEM DROP DNS CACHE" From 00e208342142f6c38d5aeca571b58b13b5b860cd Mon Sep 17 00:00:00 2001 From: Azat Khuzhin Date: Fri, 23 Jul 2021 00:49:30 +0300 Subject: [PATCH 04/19] Fix event_time_microseconds for REMOVE_PART in system.part_log --- src/Storages/MergeTree/MergeTreeData.cpp | 27 ++++++++++--------- ...event_time_microseconds_part_log.reference | 1 + ...01686_event_time_microseconds_part_log.sql | 19 ++++++++++--- 3 files changed, 31 insertions(+), 16 deletions(-) diff --git a/src/Storages/MergeTree/MergeTreeData.cpp b/src/Storages/MergeTree/MergeTreeData.cpp index cfe62fefb06..a268468ea59 100644 --- a/src/Storages/MergeTree/MergeTreeData.cpp +++ b/src/Storages/MergeTree/MergeTreeData.cpp @@ -158,6 +158,16 @@ static void checkSampleExpression(const StorageInMemoryMetadata & metadata, bool ErrorCodes::ILLEGAL_TYPE_OF_COLUMN_FOR_FILTER); } +inline UInt64 time_in_microseconds(std::chrono::time_point timepoint) +{ + return std::chrono::duration_cast(timepoint.time_since_epoch()).count(); +} + +inline UInt64 time_in_seconds(std::chrono::time_point timepoint) +{ + return std::chrono::duration_cast(timepoint.time_since_epoch()).count(); +} + MergeTreeData::MergeTreeData( const StorageID & table_id_, const String & relative_data_path_, @@ -1246,7 +1256,11 @@ void MergeTreeData::removePartsFinally(const MergeTreeData::DataPartsVector & pa PartLogElement part_log_elem; part_log_elem.event_type = PartLogElement::REMOVE_PART; - part_log_elem.event_time = time(nullptr); + + const auto time_now = std::chrono::system_clock::now(); + part_log_elem.event_time = time_in_seconds(time_now); + part_log_elem.event_time_microseconds = time_in_microseconds(time_now); + part_log_elem.duration_ms = 0; //-V1048 part_log_elem.database_name = table_id.database_name; @@ -4579,17 +4593,6 @@ bool MergeTreeData::canReplacePartition(const DataPartPtr & src_part) const return true; } -inline UInt64 time_in_microseconds(std::chrono::time_point timepoint) -{ - return std::chrono::duration_cast(timepoint.time_since_epoch()).count(); -} - - -inline UInt64 time_in_seconds(std::chrono::time_point timepoint) -{ - return std::chrono::duration_cast(timepoint.time_since_epoch()).count(); -} - void MergeTreeData::writePartLog( PartLogElement::Type type, const ExecutionStatus & execution_status, diff --git a/tests/queries/0_stateless/01686_event_time_microseconds_part_log.reference b/tests/queries/0_stateless/01686_event_time_microseconds_part_log.reference index 9766475a418..79ebd0860f4 100644 --- a/tests/queries/0_stateless/01686_event_time_microseconds_part_log.reference +++ b/tests/queries/0_stateless/01686_event_time_microseconds_part_log.reference @@ -1 +1,2 @@ ok +ok diff --git a/tests/queries/0_stateless/01686_event_time_microseconds_part_log.sql b/tests/queries/0_stateless/01686_event_time_microseconds_part_log.sql index 4a653379ef1..6063be4d1da 100644 --- a/tests/queries/0_stateless/01686_event_time_microseconds_part_log.sql +++ b/tests/queries/0_stateless/01686_event_time_microseconds_part_log.sql @@ -10,16 +10,27 @@ ORDER BY key; INSERT INTO table_with_single_pk SELECT number, toString(number % 10) FROM numbers(1000000); +-- Check NewPart SYSTEM FLUSH LOGS; - WITH ( SELECT (event_time, event_time_microseconds) FROM system.part_log - WHERE "table" = 'table_with_single_pk' - AND "database" = currentDatabase() + WHERE table = 'table_with_single_pk' AND database = currentDatabase() AND event_type = 'NewPart' ORDER BY event_time DESC LIMIT 1 ) AS time SELECT if(dateDiff('second', toDateTime(time.2), toDateTime(time.1)) = 0, 'ok', 'fail'); -DROP TABLE IF EXISTS table_with_single_pk; +-- Now let's check RemovePart +TRUNCATE TABLE table_with_single_pk; +SYSTEM FLUSH LOGS; +WITH ( + SELECT (event_time, event_time_microseconds) + FROM system.part_log + WHERE table = 'table_with_single_pk' AND database = currentDatabase() AND event_type = 'RemovePart' + ORDER BY event_time DESC + LIMIT 1 + ) AS time +SELECT if(dateDiff('second', toDateTime(time.2), toDateTime(time.1)) = 0, 'ok', 'fail'); + +DROP TABLE table_with_single_pk; From dccc379d39d13a035610eff1e79e7da7c9d68006 Mon Sep 17 00:00:00 2001 From: vdimir Date: Fri, 23 Jul 2021 10:40:03 +0300 Subject: [PATCH 05/19] Fix use after free in AsyncDrain connection from S3Cluster --- src/Client/MultiplexedConnections.cpp | 14 ++++++++++++++ src/Client/MultiplexedConnections.h | 5 ++++- src/DataStreams/RemoteQueryExecutor.cpp | 24 ++++++++++++++++++++++-- src/DataStreams/RemoteQueryExecutor.h | 15 ++++++++++++--- src/Storages/StorageS3Cluster.cpp | 8 ++++---- src/Storages/StorageS3Cluster.h | 2 -- 6 files changed, 56 insertions(+), 12 deletions(-) diff --git a/src/Client/MultiplexedConnections.cpp b/src/Client/MultiplexedConnections.cpp index e298849ad54..fe3879fdd30 100644 --- a/src/Client/MultiplexedConnections.cpp +++ b/src/Client/MultiplexedConnections.cpp @@ -29,6 +29,20 @@ MultiplexedConnections::MultiplexedConnections(Connection & connection, const Se active_connection_count = 1; } + +MultiplexedConnections::MultiplexedConnections(std::shared_ptr connection_ptr_, const Settings & settings_, const ThrottlerPtr & throttler) + : settings(settings_), drain_timeout(settings.drain_timeout), receive_timeout(settings.receive_timeout) + , connection_ptr(connection_ptr_) +{ + connection_ptr->setThrottler(throttler); + + ReplicaState replica_state; + replica_state.connection = connection_ptr.get(); + replica_states.push_back(replica_state); + + active_connection_count = 1; +} + MultiplexedConnections::MultiplexedConnections( std::vector && connections, const Settings & settings_, const ThrottlerPtr & throttler) : settings(settings_), drain_timeout(settings.drain_timeout), receive_timeout(settings.receive_timeout) diff --git a/src/Client/MultiplexedConnections.h b/src/Client/MultiplexedConnections.h index c42653b5f02..4fb7d496b0c 100644 --- a/src/Client/MultiplexedConnections.h +++ b/src/Client/MultiplexedConnections.h @@ -22,6 +22,8 @@ class MultiplexedConnections final : public IConnections public: /// Accepts ready connection. MultiplexedConnections(Connection & connection, const Settings & settings_, const ThrottlerPtr & throttler_); + /// Accepts ready connection and keep it alive before drain + MultiplexedConnections(std::shared_ptr connection_, const Settings & settings_, const ThrottlerPtr & throttler_); /// Accepts a vector of connections to replicas of one shard already taken from pool. MultiplexedConnections( @@ -79,7 +81,6 @@ private: /// Mark the replica as invalid. void invalidateReplica(ReplicaState & replica_state); -private: const Settings & settings; /// The following two fields are from settings but can be referenced outside the lifetime of @@ -95,6 +96,8 @@ private: /// Connection that received last block. Connection * current_connection = nullptr; + /// Shared connection, may be empty. Used to keep object alive before draining. + std::shared_ptr connection_ptr; bool sent_query = false; bool cancelled = false; diff --git a/src/DataStreams/RemoteQueryExecutor.cpp b/src/DataStreams/RemoteQueryExecutor.cpp index b8276d0d5c6..3607729737d 100644 --- a/src/DataStreams/RemoteQueryExecutor.cpp +++ b/src/DataStreams/RemoteQueryExecutor.cpp @@ -34,13 +34,20 @@ namespace ErrorCodes extern const int DUPLICATED_PART_UUIDS; } +RemoteQueryExecutor::RemoteQueryExecutor( + const String & query_, const Block & header_, ContextPtr context_, + const Scalars & scalars_, const Tables & external_tables_, + QueryProcessingStage::Enum stage_, std::shared_ptr task_iterator_) + : header(header_), query(query_), context(context_), scalars(scalars_) + , external_tables(external_tables_), stage(stage_), task_iterator(task_iterator_) +{} + RemoteQueryExecutor::RemoteQueryExecutor( Connection & connection, const String & query_, const Block & header_, ContextPtr context_, ThrottlerPtr throttler, const Scalars & scalars_, const Tables & external_tables_, QueryProcessingStage::Enum stage_, std::shared_ptr task_iterator_) - : header(header_), query(query_), context(context_) - , scalars(scalars_), external_tables(external_tables_), stage(stage_), task_iterator(task_iterator_), sync_draining(true) + : RemoteQueryExecutor(query_, header_, context_, scalars_, external_tables_, stage_, task_iterator_) { create_connections = [this, &connection, throttler]() { @@ -48,6 +55,19 @@ RemoteQueryExecutor::RemoteQueryExecutor( }; } +RemoteQueryExecutor::RemoteQueryExecutor( + std::shared_ptr connection_ptr, + const String & query_, const Block & header_, ContextPtr context_, + ThrottlerPtr throttler, const Scalars & scalars_, const Tables & external_tables_, + QueryProcessingStage::Enum stage_, std::shared_ptr task_iterator_) + : RemoteQueryExecutor(query_, header_, context_, scalars_, external_tables_, stage_, task_iterator_) +{ + create_connections = [this, connection_ptr, throttler]() + { + return std::make_shared(connection_ptr, context->getSettingsRef(), throttler); + }; +} + RemoteQueryExecutor::RemoteQueryExecutor( const ConnectionPoolWithFailoverPtr & pool_, std::vector && connections_, diff --git a/src/DataStreams/RemoteQueryExecutor.h b/src/DataStreams/RemoteQueryExecutor.h index 7b0a0fc0d2e..fbf6821a237 100644 --- a/src/DataStreams/RemoteQueryExecutor.h +++ b/src/DataStreams/RemoteQueryExecutor.h @@ -43,6 +43,13 @@ public: ThrottlerPtr throttler_ = nullptr, const Scalars & scalars_ = Scalars(), const Tables & external_tables_ = Tables(), QueryProcessingStage::Enum stage_ = QueryProcessingStage::Complete, std::shared_ptr task_iterator_ = {}); + /// Takes already set connection. + RemoteQueryExecutor( + std::shared_ptr connection, + const String & query_, const Block & header_, ContextPtr context_, + ThrottlerPtr throttler_ = nullptr, const Scalars & scalars_ = Scalars(), const Tables & external_tables_ = Tables(), + QueryProcessingStage::Enum stage_ = QueryProcessingStage::Complete, std::shared_ptr task_iterator_ = {}); + /// Accepts several connections already taken from pool. RemoteQueryExecutor( const ConnectionPoolWithFailoverPtr & pool, @@ -105,6 +112,11 @@ public: const Block & getHeader() const { return header; } private: + RemoteQueryExecutor( + const String & query_, const Block & header_, ContextPtr context_, + const Scalars & scalars_, const Tables & external_tables_, + QueryProcessingStage::Enum stage_, std::shared_ptr task_iterator_); + Block header; Block totals; Block extremes; @@ -124,9 +136,6 @@ private: /// Initiator identifier for distributed task processing std::shared_ptr task_iterator; - /// Drain connection synchronously when finishing. - bool sync_draining = false; - std::function()> create_connections; /// Hold a shared reference to the connection pool so that asynchronous connection draining will /// work safely. Make sure it's the first member so that we don't destruct it too early. diff --git a/src/Storages/StorageS3Cluster.cpp b/src/Storages/StorageS3Cluster.cpp index 726e3a3465f..8f2dfa591ca 100644 --- a/src/Storages/StorageS3Cluster.cpp +++ b/src/Storages/StorageS3Cluster.cpp @@ -106,7 +106,6 @@ Pipe StorageS3Cluster::read( const Scalars & scalars = context->hasQueryContext() ? context->getQueryContext()->getScalars() : Scalars{}; Pipes pipes; - connections.reserve(cluster->getShardCount()); const bool add_agg_info = processed_stage == QueryProcessingStage::WithMergeableState; @@ -115,18 +114,19 @@ Pipe StorageS3Cluster::read( /// There will be only one replica, because we consider each replica as a shard for (const auto & node : replicas) { - connections.emplace_back(std::make_shared( + auto connection = std::make_shared( node.host_name, node.port, context->getGlobalContext()->getCurrentDatabase(), node.user, node.password, node.cluster, node.cluster_secret, "S3ClusterInititiator", node.compression, node.secure - )); + ); + /// For unknown reason global context is passed to IStorage::read() method /// So, task_identifier is passed as constructor argument. It is more obvious. auto remote_query_executor = std::make_shared( - *connections.back(), + connection, queryToString(query_info.query), header, context, diff --git a/src/Storages/StorageS3Cluster.h b/src/Storages/StorageS3Cluster.h index 821765a3780..81d82052d4d 100644 --- a/src/Storages/StorageS3Cluster.h +++ b/src/Storages/StorageS3Cluster.h @@ -50,8 +50,6 @@ protected: const String & compression_method_); private: - /// Connections from initiator to other nodes - std::vector> connections; StorageS3::ClientAuthentication client_auth; String filename; From 80e0e244484dbec713397af26eabef2fa165b9a3 Mon Sep 17 00:00:00 2001 From: Nikolai Kochetov Date: Fri, 23 Jul 2021 12:29:53 +0300 Subject: [PATCH 06/19] Fix unit test and style. --- src/DataStreams/tests/gtest_check_sorted_stream.cpp | 2 +- src/Interpreters/MergeJoin.cpp | 1 - src/Interpreters/SortedBlocksWriter.cpp | 5 ----- 3 files changed, 1 insertion(+), 7 deletions(-) diff --git a/src/DataStreams/tests/gtest_check_sorted_stream.cpp b/src/DataStreams/tests/gtest_check_sorted_stream.cpp index 72de5008e0b..2788c44389b 100644 --- a/src/DataStreams/tests/gtest_check_sorted_stream.cpp +++ b/src/DataStreams/tests/gtest_check_sorted_stream.cpp @@ -106,7 +106,7 @@ TEST(CheckSortedBlockInputStream, CheckGoodCase) EXPECT_NO_THROW(executor.pull(chunk)); EXPECT_NO_THROW(executor.pull(chunk)); EXPECT_NO_THROW(executor.pull(chunk)); - EXPECT_TRUE(executor.pull(chunk)); + EXPECT_FALSE(executor.pull(chunk)); } TEST(CheckSortedBlockInputStream, CheckBadLastRow) diff --git a/src/Interpreters/MergeJoin.cpp b/src/Interpreters/MergeJoin.cpp index 31c911d21d1..f13b2bccc72 100644 --- a/src/Interpreters/MergeJoin.cpp +++ b/src/Interpreters/MergeJoin.cpp @@ -12,7 +12,6 @@ #include #include #include -#include #include #include #include diff --git a/src/Interpreters/SortedBlocksWriter.cpp b/src/Interpreters/SortedBlocksWriter.cpp index 281f9020a91..3ce9f2d1b90 100644 --- a/src/Interpreters/SortedBlocksWriter.cpp +++ b/src/Interpreters/SortedBlocksWriter.cpp @@ -12,11 +12,6 @@ namespace DB { -namespace ErrorCodes -{ - extern const int NOT_ENOUGH_SPACE; -} - namespace { From 52cc98e9c7dbc7c0710bcf8b6e2dc9728434b5b5 Mon Sep 17 00:00:00 2001 From: Nikolai Kochetov Date: Fri, 23 Jul 2021 13:55:28 +0300 Subject: [PATCH 07/19] Update MergeJoin.cpp --- src/Interpreters/MergeJoin.cpp | 2 -- 1 file changed, 2 deletions(-) diff --git a/src/Interpreters/MergeJoin.cpp b/src/Interpreters/MergeJoin.cpp index f13b2bccc72..b93a94b4215 100644 --- a/src/Interpreters/MergeJoin.cpp +++ b/src/Interpreters/MergeJoin.cpp @@ -11,8 +11,6 @@ #include #include #include -#include -#include #include #include #include From e724e972fe8455aa2197264b14d45e1ddd22d3ff Mon Sep 17 00:00:00 2001 From: Caspian Date: Fri, 23 Jul 2021 21:37:55 +0800 Subject: [PATCH 08/19] remove uncessary Exception --- src/Parsers/ASTRolesOrUsersSet.h | 2 -- src/Parsers/ParserGrantQuery.cpp | 35 +++---------------- src/Parsers/ParserRolesOrUsersSet.cpp | 19 +++------- .../01999_grant_with_replace.reference | 8 +++++ .../0_stateless/01999_grant_with_replace.sql | 17 ++++++++- 5 files changed, 33 insertions(+), 48 deletions(-) diff --git a/src/Parsers/ASTRolesOrUsersSet.h b/src/Parsers/ASTRolesOrUsersSet.h index ab2d9bb4a00..15d42ee39a0 100644 --- a/src/Parsers/ASTRolesOrUsersSet.h +++ b/src/Parsers/ASTRolesOrUsersSet.h @@ -25,8 +25,6 @@ public: bool id_mode = false; /// whether this set keep UUIDs instead of names bool use_keyword_any = false; /// whether the keyword ANY should be used instead of the keyword ALL - bool none_role_parsed = false; /// whether keyword NONE has been parsed - bool empty() const { return names.empty() && !current_user && !all; } void replaceCurrentUserTag(const String & current_user_name); diff --git a/src/Parsers/ParserGrantQuery.cpp b/src/Parsers/ParserGrantQuery.cpp index 5ea6ac0ec92..bd4b07bb7b3 100644 --- a/src/Parsers/ParserGrantQuery.cpp +++ b/src/Parsers/ParserGrantQuery.cpp @@ -293,41 +293,14 @@ bool ParserGrantQuery::parseImpl(Pos & pos, ASTPtr & node, Expected & expected) bool replace_access = false; + bool replace_role = false; if (is_replace) { if (roles) - { // assigning role mode - if (!roles->empty() && roles->none_role_parsed) - throw Exception("In assigning role WITH REPLACE OPTION sql, 'NONE' can only be used alone to rovoke all roles", ErrorCodes::SYNTAX_ERROR); - } + replace_role = true; else - { - // granting privilege mode replace_access = true; - bool new_access = false; - bool none_on_all = false; - for (auto & element : elements) - { - if (element.access_flags.isEmpty()) - { - if (element.any_database) - none_on_all = true; - else - throw Exception("In granting privilege WITH REPLACE OPTION sql, 'NONE ON db.*' should be 'NONE ON *.*', and can only be used alone to drop all privileges on any database", ErrorCodes::SYNTAX_ERROR); - } - else - { - new_access = true; - } - } - - if (new_access && none_on_all) - throw Exception("In granting privilege WITH REPLACE OPTION sql, 'NONE ON db.*' should be 'NONE ON *.*', and can only be used alone to drop all privileges on any database", ErrorCodes::SYNTAX_ERROR); - } - } - - if (is_replace && !replace_access && roles && !roles->empty() && roles->none_role_parsed) - throw Exception("In REPLACE GRANT assigning role sql, 'NONE' can only be used alone to rovoke all roles", ErrorCodes::SYNTAX_ERROR); + } if (!is_revoke) eraseNonGrantable(elements); @@ -343,7 +316,7 @@ bool ParserGrantQuery::parseImpl(Pos & pos, ASTPtr & node, Expected & expected) query->grantees = std::move(grantees); query->admin_option = admin_option; query->replace_access = replace_access; - query->replace_granted_roles = (is_replace && !replace_access); + query->replace_granted_roles = replace_role; return true; } diff --git a/src/Parsers/ParserRolesOrUsersSet.cpp b/src/Parsers/ParserRolesOrUsersSet.cpp index c5a459fd199..41e9ee6501d 100644 --- a/src/Parsers/ParserRolesOrUsersSet.cpp +++ b/src/Parsers/ParserRolesOrUsersSet.cpp @@ -44,22 +44,17 @@ namespace bool allow_current_user, bool & all, Strings & names, - bool & current_user, - bool & none_role_parsed) + bool & current_user) { bool res_all = false; Strings res_names; bool res_current_user = false; Strings res_with_roles_names; - bool parsed_none = false; auto parse_element = [&] { if (ParserKeyword{"NONE"}.ignore(pos, expected)) - { - parsed_none = true; return true; - } if (allow_all && ParserKeyword{"ALL"}.ignore(pos, expected)) { @@ -95,7 +90,6 @@ namespace names = std::move(res_names); current_user = res_current_user; all = res_all; - none_role_parsed = parsed_none; return true; } @@ -105,15 +99,14 @@ namespace bool id_mode, bool allow_current_user, Strings & except_names, - bool & except_current_user, - bool & parsed_none) + bool & except_current_user) { return IParserBase::wrapParseImpl(pos, [&] { if (!ParserKeyword{"EXCEPT"}.ignore(pos, expected)) return false; bool unused; - return parseBeforeExcept(pos, expected, id_mode, false, false, allow_current_user, unused, except_names, except_current_user, parsed_none); + return parseBeforeExcept(pos, expected, id_mode, false, false, allow_current_user, unused, except_names, except_current_user); }); } } @@ -126,12 +119,11 @@ bool ParserRolesOrUsersSet::parseImpl(Pos & pos, ASTPtr & node, Expected & expec bool current_user = false; Strings except_names; bool except_current_user = false; - bool parsed_none = false; - if (!parseBeforeExcept(pos, expected, id_mode, allow_all, allow_any, allow_current_user, all, names, current_user, parsed_none)) + if (!parseBeforeExcept(pos, expected, id_mode, allow_all, allow_any, allow_current_user, all, names, current_user)) return false; - parseExceptAndAfterExcept(pos, expected, id_mode, allow_current_user, except_names, except_current_user, parsed_none); + parseExceptAndAfterExcept(pos, expected, id_mode, allow_current_user, except_names, except_current_user); if (all) names.clear(); @@ -146,7 +138,6 @@ bool ParserRolesOrUsersSet::parseImpl(Pos & pos, ASTPtr & node, Expected & expec result->allow_roles = allow_roles; result->id_mode = id_mode; result->use_keyword_any = all && allow_any && !allow_all; - result->none_role_parsed = parsed_none; node = result; return true; } diff --git a/tests/queries/0_stateless/01999_grant_with_replace.reference b/tests/queries/0_stateless/01999_grant_with_replace.reference index 3aabc8bd384..9e089a05e52 100644 --- a/tests/queries/0_stateless/01999_grant_with_replace.reference +++ b/tests/queries/0_stateless/01999_grant_with_replace.reference @@ -26,3 +26,11 @@ GRANT test_role_01999 TO test_user_01999 K GRANT SHOW ON db8.* TO test_user_01999 L +GRANT SELECT ON db9.tb3 TO test_user_01999 +M +GRANT SELECT ON db9.tb3 TO test_user_01999 +GRANT test_role_01999 TO test_user_01999 +N +GRANT SELECT ON db9.tb3 TO test_user_01999 +GRANT test_role_01999_1 TO test_user_01999 +O diff --git a/tests/queries/0_stateless/01999_grant_with_replace.sql b/tests/queries/0_stateless/01999_grant_with_replace.sql index 9c3f68fff7e..31a9187c0d2 100644 --- a/tests/queries/0_stateless/01999_grant_with_replace.sql +++ b/tests/queries/0_stateless/01999_grant_with_replace.sql @@ -54,7 +54,22 @@ SELECT 'K'; GRANT NONE TO test_user_01999 WITH REPLACE OPTION; SHOW GRANTS FOR test_user_01999; +SELECT 'L'; +GRANT NONE ON *.*, SELECT on db9.tb3 TO test_user_01999 WITH REPLACE OPTION; +SHOW GRANTS FOR test_user_01999; + +SELECT 'M'; +GRANT test_role_01999 to test_user_01999; +SHOW GRANTS FOR test_user_01999; + +SELECT 'N'; +DROP ROLE IF EXISTS test_role_01999_1; +CREATE role test_role_01999_1; +GRANT NONE, test_role_01999_1 TO test_user_01999 WITH REPLACE OPTION; +SHOW GRANTS FOR test_user_01999; + DROP USER IF EXISTS test_user_01999; DROP ROLE IF EXISTS test_role_01999; +DROP ROLE IF EXISTS test_role_01999_1; -SELECT 'L'; +SELECT 'O'; From d1106b325e6b1baa2eb1715f9b3e5a6b252afa10 Mon Sep 17 00:00:00 2001 From: vdimir Date: Fri, 23 Jul 2021 17:35:22 +0300 Subject: [PATCH 09/19] Lock mutex before access to std::cerr in clickhouse-benchmark --- programs/benchmark/Benchmark.cpp | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/programs/benchmark/Benchmark.cpp b/programs/benchmark/Benchmark.cpp index 859222c236e..d9a9629cb07 100644 --- a/programs/benchmark/Benchmark.cpp +++ b/programs/benchmark/Benchmark.cpp @@ -271,7 +271,8 @@ private: if (max_time > 0 && total_watch.elapsedSeconds() >= max_time) { - std::cout << "Stopping launch of queries. Requested time limit is exhausted.\n"; + std::cout << "Stopping launch of queries." + << " Requested time limit " << max_time << " seconds is exhausted.\n"; return false; } @@ -368,8 +369,7 @@ private: { extracted = queue.tryPop(query, 100); - if (shutdown - || (max_iterations && queries_executed == max_iterations)) + if (shutdown || (max_iterations && queries_executed == max_iterations)) { return; } @@ -382,6 +382,7 @@ private: } catch (...) { + std::lock_guard lock(mutex); std::cerr << "An error occurred while processing the query '" << query << "'.\n"; if (!continue_on_errors) From e4f3b9e7f4e93f5653d687370030716e064bc405 Mon Sep 17 00:00:00 2001 From: vdimir Date: Fri, 23 Jul 2021 17:41:32 +0300 Subject: [PATCH 10/19] Log exception message in void thread in clickhouse-benchmark --- programs/benchmark/Benchmark.cpp | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/programs/benchmark/Benchmark.cpp b/programs/benchmark/Benchmark.cpp index d9a9629cb07..f0c1c239a6e 100644 --- a/programs/benchmark/Benchmark.cpp +++ b/programs/benchmark/Benchmark.cpp @@ -383,8 +383,8 @@ private: catch (...) { std::lock_guard lock(mutex); - std::cerr << "An error occurred while processing the query '" - << query << "'.\n"; + std::cerr << "An error occurred while processing the query " << "'" << query << "'" + << ": " << getCurrentExceptionMessage(false) << std::endl; if (!continue_on_errors) { shutdown = true; From 1dfa347e2050843be20aa2768d279975e0a6d500 Mon Sep 17 00:00:00 2001 From: Vitaliy Zakaznikov Date: Fri, 23 Jul 2021 10:59:05 -0400 Subject: [PATCH 11/19] Update error message in tests/testflows/window_functions/tests/errors.py --- tests/testflows/window_functions/tests/errors.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/tests/testflows/window_functions/tests/errors.py b/tests/testflows/window_functions/tests/errors.py index 0935c00885d..d7b80ed7cd8 100644 --- a/tests/testflows/window_functions/tests/errors.py +++ b/tests/testflows/window_functions/tests/errors.py @@ -44,8 +44,8 @@ def error_window_function_in_where(self): def error_window_function_in_join(self): """Check that trying to use window function in `JOIN` returns an error. """ - exitcode = 48 - message = "DB::Exception: JOIN ON inequalities are not supported. Unexpected 'row_number() OVER (ORDER BY salary ASC) < 10" + exitcode = 147 + message = "DB::Exception: Cannot get JOIN keys from JOIN ON section: row_number() OVER (ORDER BY salary ASC) < 10" sql = ("SELECT * FROM empsalary INNER JOIN tenk1 ON row_number() OVER (ORDER BY salary) < 10") From 421e59b9f508f26ecb8e4542cdf5750155cd77f5 Mon Sep 17 00:00:00 2001 From: Caspian Date: Fri, 23 Jul 2021 23:12:30 +0800 Subject: [PATCH 12/19] rm whitespace --- src/Parsers/ParserGrantQuery.cpp | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/Parsers/ParserGrantQuery.cpp b/src/Parsers/ParserGrantQuery.cpp index bd4b07bb7b3..85a6c9c71d4 100644 --- a/src/Parsers/ParserGrantQuery.cpp +++ b/src/Parsers/ParserGrantQuery.cpp @@ -300,7 +300,7 @@ bool ParserGrantQuery::parseImpl(Pos & pos, ASTPtr & node, Expected & expected) replace_role = true; else replace_access = true; - } + } if (!is_revoke) eraseNonGrantable(elements); From 64c35b251154d700248c85e584a281539815cab7 Mon Sep 17 00:00:00 2001 From: "Mikhail f. Shiryaev" Date: Fri, 23 Jul 2021 21:29:40 +0200 Subject: [PATCH 13/19] Disable watchdog in docker by default --- docker/server/entrypoint.sh | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/docker/server/entrypoint.sh b/docker/server/entrypoint.sh index f6e1d3c2402..40ba9f730cb 100755 --- a/docker/server/entrypoint.sh +++ b/docker/server/entrypoint.sh @@ -164,6 +164,10 @@ fi # if no args passed to `docker run` or first argument start with `--`, then the user is passing clickhouse-server arguments if [[ $# -lt 1 ]] || [[ "$1" == "--"* ]]; then + # Watchdog is launched by default, but does not send SIGINT to the main process, + # so the container can't be finished by ctrl+c + CLICKHOUSE_WATCHDOG_ENABLE=${CLICKHOUSE_WATCHDOG_ENABLE:-0} + export CLICKHOUSE_WATCHDOG_ENABLE exec $gosu /usr/bin/clickhouse-server --config-file="$CLICKHOUSE_CONFIG" "$@" fi From 249ccd879ea852ce0dfdc6974cdc92d54486ee17 Mon Sep 17 00:00:00 2001 From: Vitaly Baranov Date: Fri, 23 Jul 2021 09:15:26 +0300 Subject: [PATCH 14/19] SET PROFILE applies constraints too. --- src/Access/SettingsProfilesInfo.cpp | 8 +++++++- tests/integration/test_settings_profile/test.py | 5 ++++- 2 files changed, 11 insertions(+), 2 deletions(-) diff --git a/src/Access/SettingsProfilesInfo.cpp b/src/Access/SettingsProfilesInfo.cpp index 72bd631c5f2..46ebb6084e7 100644 --- a/src/Access/SettingsProfilesInfo.cpp +++ b/src/Access/SettingsProfilesInfo.cpp @@ -32,7 +32,13 @@ SettingsProfilesInfo::getConstraintsAndProfileIDs(const std::shared_ptr(manager); res->current_profiles = profiles; - res->constraints = previous ? previous->constraints : constraints; + if (previous) + { + res->constraints = previous->constraints; + res->constraints.merge(constraints); + } + else + res->constraints = constraints; if (previous) { diff --git a/tests/integration/test_settings_profile/test.py b/tests/integration/test_settings_profile/test.py index 7e7d3ad80d7..7be0b395764 100644 --- a/tests/integration/test_settings_profile/test.py +++ b/tests/integration/test_settings_profile/test.py @@ -214,12 +214,15 @@ def test_show_profiles(): def test_set_profile(): - instance.query("CREATE SETTINGS PROFILE P1 SETTINGS max_memory_usage=10000000001") + instance.query("CREATE SETTINGS PROFILE P1 SETTINGS max_memory_usage=10000000001 MAX 20000000002") session_id = new_session_id() instance.http_query("SET profile='P1'", user='robin', params={'session_id':session_id}) assert instance.http_query("SELECT getSetting('max_memory_usage')", user='robin', params={'session_id':session_id}) == "10000000001\n" + expected_error = "max_memory_usage shouldn't be greater than 20000000002" + assert expected_error in instance.http_query_and_get_error("SET max_memory_usage=20000000003", user='robin', params={'session_id':session_id}) + def test_changing_default_profiles_affects_new_sessions_only(): instance.query("CREATE SETTINGS PROFILE P1 SETTINGS max_memory_usage=10000000001") From 0a4e26e68269161bff9fabc70de116e735542d40 Mon Sep 17 00:00:00 2001 From: Alexey Milovidov Date: Sat, 24 Jul 2021 01:50:14 +0300 Subject: [PATCH 15/19] Experiment with sharing file descriptors --- src/IO/OpenedFile.cpp | 67 ++++++++++++++++++++++ src/IO/OpenedFile.h | 39 +++++++++++++ src/IO/OpenedFileCache.h | 74 +++++++++++++++++++++++++ src/IO/ReadBufferFromFile.cpp | 3 + src/IO/ReadBufferFromFile.h | 29 ++++++++++ src/IO/createReadBufferFromFileBase.cpp | 4 +- 6 files changed, 214 insertions(+), 2 deletions(-) create mode 100644 src/IO/OpenedFile.cpp create mode 100644 src/IO/OpenedFile.h create mode 100644 src/IO/OpenedFileCache.h diff --git a/src/IO/OpenedFile.cpp b/src/IO/OpenedFile.cpp new file mode 100644 index 00000000000..6df21e836b4 --- /dev/null +++ b/src/IO/OpenedFile.cpp @@ -0,0 +1,67 @@ +#include +#include + +#include +#include +#include + + +namespace ProfileEvents +{ + extern const Event FileOpen; +} + +namespace DB +{ + +namespace ErrorCodes +{ + extern const int FILE_DOESNT_EXIST; + extern const int CANNOT_OPEN_FILE; + extern const int CANNOT_CLOSE_FILE; +} + + +void OpenedFile::open(int flags) +{ + ProfileEvents::increment(ProfileEvents::FileOpen); + + fd = ::open(file_name.c_str(), (flags == -1 ? 0 : flags) | O_RDONLY | O_CLOEXEC); + + if (-1 == fd) + throwFromErrnoWithPath("Cannot open file " + file_name, file_name, + errno == ENOENT ? ErrorCodes::FILE_DOESNT_EXIST : ErrorCodes::CANNOT_OPEN_FILE); +} + + +std::string OpenedFile::getFileName() const +{ + return file_name; +} + + +OpenedFile::OpenedFile(const std::string & file_name_, int flags) + : file_name(file_name_) +{ + open(flags); +} + + +OpenedFile::~OpenedFile() +{ + if (fd != -1) + close(); /// Exceptions will lead to std::terminate and that's Ok. +} + + +void OpenedFile::close() +{ + if (0 != ::close(fd)) + throw Exception("Cannot close file", ErrorCodes::CANNOT_CLOSE_FILE); + + fd = -1; + metric_increment.destroy(); +} + +} + diff --git a/src/IO/OpenedFile.h b/src/IO/OpenedFile.h new file mode 100644 index 00000000000..8af0c83c363 --- /dev/null +++ b/src/IO/OpenedFile.h @@ -0,0 +1,39 @@ +#pragma once + +#include +#include + + +namespace CurrentMetrics +{ + extern const Metric OpenFileForRead; +} + + +namespace DB +{ + +/// RAII for readonly opened file descriptor. +class OpenedFile +{ +public: + OpenedFile(const std::string & file_name_, int flags); + ~OpenedFile(); + + /// Close prematurally. + void close(); + + int getFD() const { return fd; } + std::string getFileName() const; + +private: + std::string file_name; + int fd = -1; + + CurrentMetrics::Increment metric_increment{CurrentMetrics::OpenFileForRead}; + + void open(int flags); +}; + +} + diff --git a/src/IO/OpenedFileCache.h b/src/IO/OpenedFileCache.h new file mode 100644 index 00000000000..3ae668c20e1 --- /dev/null +++ b/src/IO/OpenedFileCache.h @@ -0,0 +1,74 @@ +#pragma once + +#include +#include + +#include +#include +#include + + +namespace ProfileEvents +{ + extern const Event OpenedFileCacheHits; + extern const Event OpenedFileCacheMisses; +} + +namespace DB +{ + + +/** Cache of opened files for reading. + * It allows to share file descriptors when doing reading with 'pread' syscalls on readonly files. + * Note: open/close of files is very cheap on Linux and we should not bother doing it 10 000 times a second. + * (This may not be the case on Windows with WSL. This is also not the case if strace is active. Neither when some eBPF is loaded). + * But sometimes we may end up opening one file multiple times, that increases chance exhausting opened files limit. + */ +class OpenedFileCache +{ +private: + using Key = std::pair; + + using OpenedFileWeakPtr = std::weak_ptr; + using Files = std::map; + + Files files; + std::mutex mutex; + +public: + using OpenedFilePtr = std::shared_ptr; + + OpenedFilePtr get(const std::string & path, int flags) + { + Key key(path, flags); + + std::lock_guard lock(mutex); + + auto [it, inserted] = files.emplace(key, OpenedFilePtr{}); + if (!inserted) + if (auto res = it->second.lock()) + return res; + + OpenedFilePtr res + { + new OpenedFile(path, flags), + [key, this](auto ptr) + { + { + std::lock_guard another_lock(mutex); + files.erase(key); + } + delete ptr; + } + }; + + it->second = res; + return res; + } +}; + +using OpenedFileCachePtr = std::shared_ptr; + +} + + diff --git a/src/IO/ReadBufferFromFile.cpp b/src/IO/ReadBufferFromFile.cpp index d0f94441622..2d0d135f886 100644 --- a/src/IO/ReadBufferFromFile.cpp +++ b/src/IO/ReadBufferFromFile.cpp @@ -88,4 +88,7 @@ void ReadBufferFromFile::close() metric_increment.destroy(); } + +OpenedFileCache ReadBufferFromFilePReadWithCache::cache; + } diff --git a/src/IO/ReadBufferFromFile.h b/src/IO/ReadBufferFromFile.h index 676f53afeb8..5b4f997ce3d 100644 --- a/src/IO/ReadBufferFromFile.h +++ b/src/IO/ReadBufferFromFile.h @@ -1,12 +1,14 @@ #pragma once #include +#include #include #ifndef O_DIRECT #define O_DIRECT 00040000 #endif + namespace CurrentMetrics { extern const Metric OpenFileForRead; @@ -60,4 +62,31 @@ public: } }; + +/** Similar to ReadBufferFromFilePRead but also transparently shares open file descriptors. + */ +class ReadBufferFromFilePReadWithCache : public ReadBufferFromFileDescriptorPRead +{ +private: + static OpenedFileCache cache; + + std::string file_name; + OpenedFileCache::OpenedFilePtr file; + +public: + ReadBufferFromFilePReadWithCache(const std::string & file_name_, size_t buf_size = DBMS_DEFAULT_BUFFER_SIZE, int flags = -1, + char * existing_memory = nullptr, size_t alignment = 0) + : ReadBufferFromFileDescriptorPRead(-1, buf_size, existing_memory, alignment), + file_name(file_name_) + { + file = cache.get(file_name, flags); + fd = file->getFD(); + } + + std::string getFileName() const override + { + return file_name; + } +}; + } diff --git a/src/IO/createReadBufferFromFileBase.cpp b/src/IO/createReadBufferFromFileBase.cpp index 11a0937ee48..9675d89c0dd 100644 --- a/src/IO/createReadBufferFromFileBase.cpp +++ b/src/IO/createReadBufferFromFileBase.cpp @@ -75,7 +75,7 @@ std::unique_ptr createReadBufferFromFileBase( /// Attempt to open a file with O_DIRECT try { - auto res = std::make_unique( + auto res = std::make_unique( filename, buffer_size, (flags == -1 ? O_RDONLY | O_CLOEXEC : flags) | O_DIRECT, existing_memory, alignment); ProfileEvents::increment(ProfileEvents::CreatedReadBufferDirectIO); return res; @@ -92,7 +92,7 @@ std::unique_ptr createReadBufferFromFileBase( #endif ProfileEvents::increment(ProfileEvents::CreatedReadBufferOrdinary); - return std::make_unique(filename, buffer_size, flags, existing_memory, alignment); + return std::make_unique(filename, buffer_size, flags, existing_memory, alignment); } } From 2af02cc103c61b2b5606d9300bd65e2fcabd436c Mon Sep 17 00:00:00 2001 From: robot-clickhouse Date: Sat, 24 Jul 2021 09:02:06 +0300 Subject: [PATCH 16/19] Auto version update to [21.9.1.7556] [54454] --- cmake/autogenerated_versions.txt | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/cmake/autogenerated_versions.txt b/cmake/autogenerated_versions.txt index 18072566d04..ff2fd771c5e 100644 --- a/cmake/autogenerated_versions.txt +++ b/cmake/autogenerated_versions.txt @@ -6,7 +6,7 @@ SET(VERSION_REVISION 54454) SET(VERSION_MAJOR 21) SET(VERSION_MINOR 9) SET(VERSION_PATCH 1) -SET(VERSION_GITHASH f48c5af90c2ad51955d1ee3b6b05d006b03e4238) -SET(VERSION_DESCRIBE v21.9.1.1-prestable) -SET(VERSION_STRING 21.9.1.1) +SET(VERSION_GITHASH c4f997337d5ca8ae0f9fed829b34248d4ee4a4a4) +SET(VERSION_DESCRIBE v21.9.1.7556-prestable) +SET(VERSION_STRING 21.9.1.7556) # end of autochange From 9dd32ce30f6fae3dff3be687c53c0fcba88cd91a Mon Sep 17 00:00:00 2001 From: Alexey Milovidov Date: Sat, 24 Jul 2021 13:11:40 +0300 Subject: [PATCH 17/19] Revert "Auto version update to [21.9.1.7556] [54454]" CC @akuzm, please fix your script. This reverts commit 2af02cc103c61b2b5606d9300bd65e2fcabd436c. --- cmake/autogenerated_versions.txt | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/cmake/autogenerated_versions.txt b/cmake/autogenerated_versions.txt index ff2fd771c5e..18072566d04 100644 --- a/cmake/autogenerated_versions.txt +++ b/cmake/autogenerated_versions.txt @@ -6,7 +6,7 @@ SET(VERSION_REVISION 54454) SET(VERSION_MAJOR 21) SET(VERSION_MINOR 9) SET(VERSION_PATCH 1) -SET(VERSION_GITHASH c4f997337d5ca8ae0f9fed829b34248d4ee4a4a4) -SET(VERSION_DESCRIBE v21.9.1.7556-prestable) -SET(VERSION_STRING 21.9.1.7556) +SET(VERSION_GITHASH f48c5af90c2ad51955d1ee3b6b05d006b03e4238) +SET(VERSION_DESCRIBE v21.9.1.1-prestable) +SET(VERSION_STRING 21.9.1.1) # end of autochange From acb97c6f4a7b48f115c5b6cbd3a7ba52afbed6a0 Mon Sep 17 00:00:00 2001 From: alexey-milovidov Date: Sat, 24 Jul 2021 15:11:23 +0300 Subject: [PATCH 18/19] Update materialize-mysql.md --- docs/en/engines/database-engines/materialize-mysql.md | 2 ++ 1 file changed, 2 insertions(+) diff --git a/docs/en/engines/database-engines/materialize-mysql.md b/docs/en/engines/database-engines/materialize-mysql.md index 198808fa952..83904ea0c01 100644 --- a/docs/en/engines/database-engines/materialize-mysql.md +++ b/docs/en/engines/database-engines/materialize-mysql.md @@ -5,6 +5,8 @@ toc_title: MaterializeMySQL # MaterializeMySQL {#materialize-mysql} +**This is experimental feature that should not be used in production.** + Creates ClickHouse database with all the tables existing in MySQL, and all the data in those tables. ClickHouse server works as MySQL replica. It reads binlog and performs DDL and DML queries. From 2033cb610d33a152bc09e7888caefdfa9d00122e Mon Sep 17 00:00:00 2001 From: Ivan Blinkov Date: Sat, 24 Jul 2021 16:36:26 +0300 Subject: [PATCH 19/19] Remove past event --- README.md | 3 --- 1 file changed, 3 deletions(-) diff --git a/README.md b/README.md index 71a35b8e2ae..496a6357f44 100644 --- a/README.md +++ b/README.md @@ -13,6 +13,3 @@ ClickHouse® is an open-source column-oriented database management system that a * [Code Browser](https://clickhouse.tech/codebrowser/html_report/ClickHouse/index.html) with syntax highlight and navigation. * [Contacts](https://clickhouse.tech/#contacts) can help to get your questions answered if there are any. * You can also [fill this form](https://clickhouse.tech/#meet) to meet Yandex ClickHouse team in person. - -## Upcoming Events -* [ClickHouse Meetup by ByteDance (online)](https://www.meetup.com/ByteDanceDev-group/events/279543467/) on 23 July 2021.