From ca283a4bb88a097d15293ee0def550543fc279b2 Mon Sep 17 00:00:00 2001 From: kssenii Date: Thu, 16 Dec 2021 11:26:43 +0300 Subject: [PATCH 01/28] Fix optimization with lazy seek --- src/Common/ProfileEvents.cpp | 2 + ...chronousReadIndirectBufferFromRemoteFS.cpp | 40 +++++++++++++------ src/Disks/IO/ReadBufferFromRemoteFSGather.cpp | 34 ++++++++++------ src/Disks/IO/ReadBufferFromRemoteFSGather.h | 8 +++- .../IO/ReadIndirectBufferFromRemoteFS.cpp | 16 ++++---- src/Disks/IO/ThreadPoolRemoteFSReader.cpp | 8 ++-- src/Disks/IO/ThreadPoolRemoteFSReader.h | 4 +- ...ynchronousReadBufferFromFileDescriptor.cpp | 7 ++-- src/IO/AsynchronousReader.h | 5 ++- src/IO/SynchronousReader.cpp | 4 +- src/IO/ThreadPoolReader.cpp | 6 +-- 11 files changed, 83 insertions(+), 51 deletions(-) diff --git a/src/Common/ProfileEvents.cpp b/src/Common/ProfileEvents.cpp index 982523a3ef2..9c4f524a322 100644 --- a/src/Common/ProfileEvents.cpp +++ b/src/Common/ProfileEvents.cpp @@ -259,6 +259,8 @@ M(RemoteFSUnusedPrefetches, "Number of prefetches pending at buffer destruction") \ M(RemoteFSPrefetchedReads, "Number of reads from prefecthed buffer") \ M(RemoteFSUnprefetchedReads, "Number of reads from unprefetched buffer") \ + M(RemoteFSLazySeeks, "Number of lazy seeks") \ + M(RemoteFSSeeksWithReset, "Number of seeks which lead to a new connection") \ M(RemoteFSBuffers, "Number of buffers created for asynchronous reading from remote filesystem") \ \ M(ReadBufferSeekCancelConnection, "Number of seeks which lead to new connection (s3, http)") \ diff --git a/src/Disks/IO/AsynchronousReadIndirectBufferFromRemoteFS.cpp b/src/Disks/IO/AsynchronousReadIndirectBufferFromRemoteFS.cpp index 23fd353a5f0..107fb5fa059 100644 --- a/src/Disks/IO/AsynchronousReadIndirectBufferFromRemoteFS.cpp +++ b/src/Disks/IO/AsynchronousReadIndirectBufferFromRemoteFS.cpp @@ -21,6 +21,8 @@ namespace ProfileEvents extern const Event RemoteFSUnusedPrefetches; extern const Event RemoteFSPrefetchedReads; extern const Event RemoteFSUnprefetchedReads; + extern const Event RemoteFSLazySeeks; + extern const Event RemoteFSSeeksWithReset; extern const Event RemoteFSBuffers; } @@ -152,11 +154,14 @@ bool AsynchronousReadIndirectBufferFromRemoteFS::nextImpl() CurrentMetrics::Increment metric_increment{CurrentMetrics::AsynchronousReadWait}; Stopwatch watch; { - size = prefetch_future.get(); + size_t offset; + std::tie(size, offset) = prefetch_future.get(); + assert(offset < working_buffer.size()); if (size) { memory.swap(prefetch_buffer); - set(memory.data(), memory.size()); + size -= offset; + set(memory.data() + offset, size); working_buffer.resize(size); file_offset_of_buffer_end += size; } @@ -168,16 +173,22 @@ bool AsynchronousReadIndirectBufferFromRemoteFS::nextImpl() else { ProfileEvents::increment(ProfileEvents::RemoteFSUnprefetchedReads); - size = readInto(memory.data(), memory.size()).get(); + size_t offset; + std::tie(size, offset) = readInto(memory.data(), memory.size()).get(); + assert(offset < working_buffer.size()); if (size) { - set(memory.data(), memory.size()); + size -= offset; + set(memory.data() + offset, size); working_buffer.resize(size); file_offset_of_buffer_end += size; } } + if (file_offset_of_buffer_end != impl->offset()) + throw Exception(ErrorCodes::LOGICAL_ERROR, "Expected equality {} == {}. It's a bug", file_offset_of_buffer_end, impl->offset()); + prefetch_future = {}; return size; } @@ -231,18 +242,23 @@ off_t AsynchronousReadIndirectBufferFromRemoteFS::seek(off_t offset_, int whence pos = working_buffer.end(); - /// Note: we read in range [file_offset_of_buffer_end, read_until_position). - if (read_until_position && file_offset_of_buffer_end < *read_until_position - && static_cast(file_offset_of_buffer_end) >= getPosition() - && static_cast(file_offset_of_buffer_end) < getPosition() + static_cast(min_bytes_for_seek)) + /** + * Lazy ignore. Save number of bytes to ignore and ignore it either for prefetch buffer or current buffer. + * Note: we read in range [file_offset_of_buffer_end, read_until_position). + */ + off_t file_offset_before_seek = impl->offset(); + if (impl->initialized() + && read_until_position && file_offset_of_buffer_end < *read_until_position + && static_cast(file_offset_of_buffer_end) > file_offset_before_seek + && static_cast(file_offset_of_buffer_end) < file_offset_before_seek + static_cast(min_bytes_for_seek)) { - /** - * Lazy ignore. Save number of bytes to ignore and ignore it either for prefetch buffer or current buffer. - */ - bytes_to_ignore = file_offset_of_buffer_end - getPosition(); + ProfileEvents::increment(ProfileEvents::RemoteFSLazySeeks); + bytes_to_ignore = file_offset_of_buffer_end - file_offset_before_seek; + prefetch_future = readInto(prefetch_buffer.data(), prefetch_buffer.size()); } else { + ProfileEvents::increment(ProfileEvents::RemoteFSSeeksWithReset); impl->reset(); } diff --git a/src/Disks/IO/ReadBufferFromRemoteFSGather.cpp b/src/Disks/IO/ReadBufferFromRemoteFSGather.cpp index 534258eaca6..564e87098b7 100644 --- a/src/Disks/IO/ReadBufferFromRemoteFSGather.cpp +++ b/src/Disks/IO/ReadBufferFromRemoteFSGather.cpp @@ -65,7 +65,7 @@ ReadBufferFromRemoteFSGather::ReadBufferFromRemoteFSGather(const RemoteMetadata } -size_t ReadBufferFromRemoteFSGather::readInto(char * data, size_t size, size_t offset, size_t ignore) +std::pair ReadBufferFromRemoteFSGather::readInto(char * data, size_t size, size_t offset, size_t ignore) { /** * Set `data` to current working and internal buffers. @@ -73,23 +73,24 @@ size_t ReadBufferFromRemoteFSGather::readInto(char * data, size_t size, size_t o */ set(data, size); - absolute_position = offset; + file_offset_of_buffer_end = offset; bytes_to_ignore = ignore; + if (bytes_to_ignore) + assert(initialized()); auto result = nextImpl(); - bytes_to_ignore = 0; if (result) - return working_buffer.size(); + return {working_buffer.size(), BufferBase::offset()}; - return 0; + return {0, 0}; } void ReadBufferFromRemoteFSGather::initialize() { /// One clickhouse file can be split into multiple files in remote fs. - auto current_buf_offset = absolute_position; + auto current_buf_offset = file_offset_of_buffer_end; for (size_t i = 0; i < metadata.remote_fs_objects.size(); ++i) { const auto & [file_path, size] = metadata.remote_fs_objects[i]; @@ -144,7 +145,6 @@ bool ReadBufferFromRemoteFSGather::nextImpl() return readImpl(); } - bool ReadBufferFromRemoteFSGather::readImpl() { swap(*current_buf); @@ -155,15 +155,26 @@ bool ReadBufferFromRemoteFSGather::readImpl() * we save how many bytes need to be ignored (new_offset - position() bytes). */ if (bytes_to_ignore) + { current_buf->ignore(bytes_to_ignore); + bytes_to_ignore = 0; + } - auto result = current_buf->next(); + bool result = current_buf->hasPendingData(); + if (result) + { + /// bytes_to_ignore already added. + file_offset_of_buffer_end += current_buf->available(); + } + else + { + result = current_buf->next(); + if (result) + file_offset_of_buffer_end += current_buf->buffer().size(); + } swap(*current_buf); - if (result) - absolute_position += working_buffer.size(); - return result; } @@ -180,7 +191,6 @@ void ReadBufferFromRemoteFSGather::reset() current_buf.reset(); } - String ReadBufferFromRemoteFSGather::getFileName() const { return canonical_path; diff --git a/src/Disks/IO/ReadBufferFromRemoteFSGather.h b/src/Disks/IO/ReadBufferFromRemoteFSGather.h index 045ab43850d..a6263a1ee5c 100644 --- a/src/Disks/IO/ReadBufferFromRemoteFSGather.h +++ b/src/Disks/IO/ReadBufferFromRemoteFSGather.h @@ -37,10 +37,14 @@ public: void setReadUntilPosition(size_t position) override; - size_t readInto(char * data, size_t size, size_t offset, size_t ignore = 0); + std::pair readInto(char * data, size_t size, size_t offset, size_t ignore = 0); size_t getFileSize() const; + size_t offset() const { return file_offset_of_buffer_end; } + + bool initialized() const { return current_buf != nullptr; } + protected: virtual SeekableReadBufferPtr createImplementationBuffer(const String & path, size_t read_until_position) const = 0; @@ -57,7 +61,7 @@ private: size_t current_buf_idx = 0; - size_t absolute_position = 0; + size_t file_offset_of_buffer_end = 0; size_t bytes_to_ignore = 0; diff --git a/src/Disks/IO/ReadIndirectBufferFromRemoteFS.cpp b/src/Disks/IO/ReadIndirectBufferFromRemoteFS.cpp index 112124d9fd7..c21a55d68ac 100644 --- a/src/Disks/IO/ReadIndirectBufferFromRemoteFS.cpp +++ b/src/Disks/IO/ReadIndirectBufferFromRemoteFS.cpp @@ -20,7 +20,7 @@ ReadIndirectBufferFromRemoteFS::ReadIndirectBufferFromRemoteFS( off_t ReadIndirectBufferFromRemoteFS::getPosition() { - return impl->absolute_position - available(); + return impl->file_offset_of_buffer_end - available(); } @@ -35,29 +35,29 @@ off_t ReadIndirectBufferFromRemoteFS::seek(off_t offset_, int whence) if (whence == SEEK_CUR) { /// If position within current working buffer - shift pos. - if (!working_buffer.empty() && size_t(getPosition() + offset_) < impl->absolute_position) + if (!working_buffer.empty() && size_t(getPosition() + offset_) < impl->file_offset_of_buffer_end) { pos += offset_; return getPosition(); } else { - impl->absolute_position += offset_; + impl->file_offset_of_buffer_end += offset_; } } else if (whence == SEEK_SET) { /// If position within current working buffer - shift pos. if (!working_buffer.empty() - && size_t(offset_) >= impl->absolute_position - working_buffer.size() - && size_t(offset_) < impl->absolute_position) + && size_t(offset_) >= impl->file_offset_of_buffer_end - working_buffer.size() + && size_t(offset_) < impl->file_offset_of_buffer_end) { - pos = working_buffer.end() - (impl->absolute_position - offset_); + pos = working_buffer.end() - (impl->file_offset_of_buffer_end - offset_); return getPosition(); } else { - impl->absolute_position = offset_; + impl->file_offset_of_buffer_end = offset_; } } else @@ -66,7 +66,7 @@ off_t ReadIndirectBufferFromRemoteFS::seek(off_t offset_, int whence) impl->reset(); pos = working_buffer.end(); - return impl->absolute_position; + return impl->file_offset_of_buffer_end; } diff --git a/src/Disks/IO/ThreadPoolRemoteFSReader.cpp b/src/Disks/IO/ThreadPoolRemoteFSReader.cpp index 945b2d3eb7e..69961c9b7bd 100644 --- a/src/Disks/IO/ThreadPoolRemoteFSReader.cpp +++ b/src/Disks/IO/ThreadPoolRemoteFSReader.cpp @@ -28,7 +28,7 @@ namespace CurrentMetrics namespace DB { -size_t ThreadPoolRemoteFSReader::RemoteFSFileDescriptor::readInto(char * data, size_t size, size_t offset, size_t ignore) +std::pair ThreadPoolRemoteFSReader::RemoteFSFileDescriptor::readInto(char * data, size_t size, size_t offset, size_t ignore) { return reader->readInto(data, size, offset, ignore); } @@ -44,18 +44,18 @@ std::future ThreadPoolRemoteFSReader::submit(Reques { auto task = std::make_shared>([request] { - setThreadName("ThreadPoolRemoteFSRead"); + setThreadName("VFSRead"); CurrentMetrics::Increment metric_increment{CurrentMetrics::Read}; auto * remote_fs_fd = assert_cast(request.descriptor.get()); Stopwatch watch(CLOCK_MONOTONIC); - auto bytes_read = remote_fs_fd->readInto(request.buf, request.size, request.offset, request.ignore); + auto [bytes_read, offset] = remote_fs_fd->readInto(request.buf, request.size, request.offset, request.ignore); watch.stop(); ProfileEvents::increment(ProfileEvents::RemoteFSReadMicroseconds, watch.elapsedMicroseconds()); ProfileEvents::increment(ProfileEvents::RemoteFSReadBytes, bytes_read); - return bytes_read; + return std::make_pair(bytes_read, offset); }); auto future = task->get_future(); diff --git a/src/Disks/IO/ThreadPoolRemoteFSReader.h b/src/Disks/IO/ThreadPoolRemoteFSReader.h index c300162e214..1077e32ab0b 100644 --- a/src/Disks/IO/ThreadPoolRemoteFSReader.h +++ b/src/Disks/IO/ThreadPoolRemoteFSReader.h @@ -28,9 +28,9 @@ public: struct ThreadPoolRemoteFSReader::RemoteFSFileDescriptor : public IFileDescriptor { public: - RemoteFSFileDescriptor(std::shared_ptr reader_) : reader(reader_) {} + explicit RemoteFSFileDescriptor(std::shared_ptr reader_) : reader(reader_) {} - size_t readInto(char * data, size_t size, size_t offset, size_t ignore = 0); + std::pair readInto(char * data, size_t size, size_t offset, size_t ignore = 0); private: std::shared_ptr reader; diff --git a/src/IO/AsynchronousReadBufferFromFileDescriptor.cpp b/src/IO/AsynchronousReadBufferFromFileDescriptor.cpp index b2be45471c8..501e82520b1 100644 --- a/src/IO/AsynchronousReadBufferFromFileDescriptor.cpp +++ b/src/IO/AsynchronousReadBufferFromFileDescriptor.cpp @@ -65,11 +65,11 @@ bool AsynchronousReadBufferFromFileDescriptor::nextImpl() { /// Read request already in flight. Wait for its completion. - size_t size = 0; + size_t size = 0, offset = 0; { Stopwatch watch; CurrentMetrics::Increment metric_increment{CurrentMetrics::AsynchronousReadWait}; - size = prefetch_future.get(); + std::tie(size, offset) = prefetch_future.get(); ProfileEvents::increment(ProfileEvents::AsynchronousReadWaitMicroseconds, watch.elapsedMicroseconds()); } @@ -90,7 +90,7 @@ bool AsynchronousReadBufferFromFileDescriptor::nextImpl() { /// No pending request. Do synchronous read. - auto size = readInto(memory.data(), memory.size()).get(); + auto [size, offset] = readInto(memory.data(), memory.size()).get(); file_offset_of_buffer_end += size; if (size) @@ -201,4 +201,3 @@ void AsynchronousReadBufferFromFileDescriptor::rewind() } } - diff --git a/src/IO/AsynchronousReader.h b/src/IO/AsynchronousReader.h index e4a81623205..157e6b9361c 100644 --- a/src/IO/AsynchronousReader.h +++ b/src/IO/AsynchronousReader.h @@ -49,10 +49,13 @@ public: size_t ignore = 0; }; + /// 1. size /// Less than requested amount of data can be returned. /// If size is zero - the file has ended. /// (for example, EINTR must be handled by implementation automatically) - using Result = size_t; + /// 2. offset + /// Optional. Useful when implementation needs to do ignore(). + using Result = std::pair; /// Submit request and obtain a handle. This method don't perform any waits. /// If this method did not throw, the caller must wait for the result with 'wait' method diff --git a/src/IO/SynchronousReader.cpp b/src/IO/SynchronousReader.cpp index 599299ddad4..d666d34fdcb 100644 --- a/src/IO/SynchronousReader.cpp +++ b/src/IO/SynchronousReader.cpp @@ -82,10 +82,8 @@ std::future SynchronousReader::submit(Request reque watch.stop(); ProfileEvents::increment(ProfileEvents::DiskReadElapsedMicroseconds, watch.elapsedMicroseconds()); - return bytes_read; + return std::make_pair(bytes_read, static_cast(0)); }); } } - - diff --git a/src/IO/ThreadPoolReader.cpp b/src/IO/ThreadPoolReader.cpp index 32bc13ecb75..0041e881b25 100644 --- a/src/IO/ThreadPoolReader.cpp +++ b/src/IO/ThreadPoolReader.cpp @@ -117,7 +117,7 @@ std::future ThreadPoolReader::submit(Request reques if (!res) { /// The file has ended. - promise.set_value(0); + promise.set_value({0, 0}); watch.stop(); ProfileEvents::increment(ProfileEvents::ThreadPoolReaderPageCacheHitElapsedMicroseconds, watch.elapsedMicroseconds()); @@ -176,7 +176,7 @@ std::future ThreadPoolReader::submit(Request reques ProfileEvents::increment(ProfileEvents::ThreadPoolReaderPageCacheHitElapsedMicroseconds, watch.elapsedMicroseconds()); ProfileEvents::increment(ProfileEvents::DiskReadElapsedMicroseconds, watch.elapsedMicroseconds()); - promise.set_value(bytes_read); + promise.set_value({bytes_read, 0}); return future; } } @@ -219,7 +219,7 @@ std::future ThreadPoolReader::submit(Request reques ProfileEvents::increment(ProfileEvents::ThreadPoolReaderPageCacheMissElapsedMicroseconds, watch.elapsedMicroseconds()); ProfileEvents::increment(ProfileEvents::DiskReadElapsedMicroseconds, watch.elapsedMicroseconds()); - return bytes_read; + return std::make_pair(bytes_read, static_cast(0)); }); auto future = task->get_future(); From 8199b24b8068f885a16a3cd10aae01779c4af1f3 Mon Sep 17 00:00:00 2001 From: kssenii Date: Fri, 17 Dec 2021 00:29:25 +0300 Subject: [PATCH 02/28] Add test --- tests/integration/test_merge_tree_s3/test.py | 9 +++++++++ 1 file changed, 9 insertions(+) diff --git a/tests/integration/test_merge_tree_s3/test.py b/tests/integration/test_merge_tree_s3/test.py index 1e607e94119..46338b4316c 100644 --- a/tests/integration/test_merge_tree_s3/test.py +++ b/tests/integration/test_merge_tree_s3/test.py @@ -456,3 +456,12 @@ def test_s3_disk_reads_on_unstable_connection(cluster, node_name): for i in range(30): print(f"Read sequence {i}") assert node.query("SELECT sum(id) FROM s3_test").splitlines() == ["40499995500000"] + + +@pytest.mark.parametrize("node_name", ["node"]) +def test_lazy_seek_optimization_for_async_read(cluster, node_name): + node = cluster.instances[node_name] + node.query("DROP TABLE IF EXISTS test") + node.query("CREATE TABLE test (key UInt32, value String) Engine=MergeTree() ORDER BY key SETTINGS storage_policy='s3';") + node.query("INSERT INTO test SELECT * FROM generateRandom('key UInt32, value String') LIMIT 100000000") + assert int(node.query("SELECT count() FROM test")) == 100000000 From 40c266840c4968549dc17c6e15a7a2de52cdcc25 Mon Sep 17 00:00:00 2001 From: kssenii Date: Fri, 17 Dec 2021 12:00:22 +0300 Subject: [PATCH 03/28] Fixes --- ...chronousReadIndirectBufferFromRemoteFS.cpp | 15 +++++---- src/Disks/IO/ReadBufferFromRemoteFSGather.cpp | 2 +- src/Disks/IO/ReadBufferFromRemoteFSGather.h | 8 ++++- src/Disks/IO/ThreadPoolRemoteFSReader.cpp | 5 ++- src/Disks/IO/ThreadPoolRemoteFSReader.h | 4 +-- ...ynchronousReadBufferFromFileDescriptor.cpp | 5 +-- src/IO/AsynchronousReader.h | 19 +++++++---- src/IO/SynchronousReader.cpp | 3 +- src/IO/ThreadPoolReader.cpp | 2 +- .../MergeTree/MergeTreeReaderCompact.cpp | 32 +++++++++++++++++-- .../MergeTree/MergeTreeReaderCompact.h | 6 ++++ tests/integration/test_merge_tree_s3/test.py | 4 +-- .../test_merge_tree_s3_failover/test.py | 1 - .../test_merge_tree_s3_restore/test.py | 1 + .../test_merge_tree_s3_with_cache/test.py | 1 - .../test.py | 1 - 16 files changed, 78 insertions(+), 31 deletions(-) diff --git a/src/Disks/IO/AsynchronousReadIndirectBufferFromRemoteFS.cpp b/src/Disks/IO/AsynchronousReadIndirectBufferFromRemoteFS.cpp index 107fb5fa059..9cadc9a08a7 100644 --- a/src/Disks/IO/AsynchronousReadIndirectBufferFromRemoteFS.cpp +++ b/src/Disks/IO/AsynchronousReadIndirectBufferFromRemoteFS.cpp @@ -154,9 +154,11 @@ bool AsynchronousReadIndirectBufferFromRemoteFS::nextImpl() CurrentMetrics::Increment metric_increment{CurrentMetrics::AsynchronousReadWait}; Stopwatch watch; { - size_t offset; - std::tie(size, offset) = prefetch_future.get(); - assert(offset < working_buffer.size()); + auto result = prefetch_future.get(); + size = result.size; + auto offset = result.offset; + assert(offset < size); + if (size) { memory.swap(prefetch_buffer); @@ -173,9 +175,10 @@ bool AsynchronousReadIndirectBufferFromRemoteFS::nextImpl() else { ProfileEvents::increment(ProfileEvents::RemoteFSUnprefetchedReads); - size_t offset; - std::tie(size, offset) = readInto(memory.data(), memory.size()).get(); - assert(offset < working_buffer.size()); + auto result = readInto(memory.data(), memory.size()).get(); + size = result.size; + auto offset = result.offset; + assert(offset < size); if (size) { diff --git a/src/Disks/IO/ReadBufferFromRemoteFSGather.cpp b/src/Disks/IO/ReadBufferFromRemoteFSGather.cpp index 564e87098b7..879724a7df3 100644 --- a/src/Disks/IO/ReadBufferFromRemoteFSGather.cpp +++ b/src/Disks/IO/ReadBufferFromRemoteFSGather.cpp @@ -65,7 +65,7 @@ ReadBufferFromRemoteFSGather::ReadBufferFromRemoteFSGather(const RemoteMetadata } -std::pair ReadBufferFromRemoteFSGather::readInto(char * data, size_t size, size_t offset, size_t ignore) +ReadBufferFromRemoteFSGather::ReadResult ReadBufferFromRemoteFSGather::readInto(char * data, size_t size, size_t offset, size_t ignore) { /** * Set `data` to current working and internal buffers. diff --git a/src/Disks/IO/ReadBufferFromRemoteFSGather.h b/src/Disks/IO/ReadBufferFromRemoteFSGather.h index a6263a1ee5c..00c8fbec670 100644 --- a/src/Disks/IO/ReadBufferFromRemoteFSGather.h +++ b/src/Disks/IO/ReadBufferFromRemoteFSGather.h @@ -37,7 +37,13 @@ public: void setReadUntilPosition(size_t position) override; - std::pair readInto(char * data, size_t size, size_t offset, size_t ignore = 0); + struct ReadResult + { + size_t size = 0; + size_t offset = 0; + }; + + ReadResult readInto(char * data, size_t size, size_t offset, size_t ignore = 0); size_t getFileSize() const; diff --git a/src/Disks/IO/ThreadPoolRemoteFSReader.cpp b/src/Disks/IO/ThreadPoolRemoteFSReader.cpp index 69961c9b7bd..4be55ff3ecf 100644 --- a/src/Disks/IO/ThreadPoolRemoteFSReader.cpp +++ b/src/Disks/IO/ThreadPoolRemoteFSReader.cpp @@ -8,7 +8,6 @@ #include #include -#include #include #include @@ -28,7 +27,7 @@ namespace CurrentMetrics namespace DB { -std::pair ThreadPoolRemoteFSReader::RemoteFSFileDescriptor::readInto(char * data, size_t size, size_t offset, size_t ignore) +ReadBufferFromRemoteFSGather::ReadResult ThreadPoolRemoteFSReader::RemoteFSFileDescriptor::readInto(char * data, size_t size, size_t offset, size_t ignore) { return reader->readInto(data, size, offset, ignore); } @@ -55,7 +54,7 @@ std::future ThreadPoolRemoteFSReader::submit(Reques ProfileEvents::increment(ProfileEvents::RemoteFSReadMicroseconds, watch.elapsedMicroseconds()); ProfileEvents::increment(ProfileEvents::RemoteFSReadBytes, bytes_read); - return std::make_pair(bytes_read, offset); + return Result{ .size = bytes_read, .offset = offset }; }); auto future = task->get_future(); diff --git a/src/Disks/IO/ThreadPoolRemoteFSReader.h b/src/Disks/IO/ThreadPoolRemoteFSReader.h index 1077e32ab0b..b2d5f11724a 100644 --- a/src/Disks/IO/ThreadPoolRemoteFSReader.h +++ b/src/Disks/IO/ThreadPoolRemoteFSReader.h @@ -3,12 +3,12 @@ #include #include #include +#include #include namespace DB { -class ReadBufferFromRemoteFSGather; class ThreadPoolRemoteFSReader : public IAsynchronousReader { @@ -30,7 +30,7 @@ struct ThreadPoolRemoteFSReader::RemoteFSFileDescriptor : public IFileDescriptor public: explicit RemoteFSFileDescriptor(std::shared_ptr reader_) : reader(reader_) {} - std::pair readInto(char * data, size_t size, size_t offset, size_t ignore = 0); + ReadBufferFromRemoteFSGather::ReadResult readInto(char * data, size_t size, size_t offset, size_t ignore = 0); private: std::shared_ptr reader; diff --git a/src/IO/AsynchronousReadBufferFromFileDescriptor.cpp b/src/IO/AsynchronousReadBufferFromFileDescriptor.cpp index 501e82520b1..995ed443428 100644 --- a/src/IO/AsynchronousReadBufferFromFileDescriptor.cpp +++ b/src/IO/AsynchronousReadBufferFromFileDescriptor.cpp @@ -65,11 +65,12 @@ bool AsynchronousReadBufferFromFileDescriptor::nextImpl() { /// Read request already in flight. Wait for its completion. - size_t size = 0, offset = 0; + size_t size = 0; { Stopwatch watch; CurrentMetrics::Increment metric_increment{CurrentMetrics::AsynchronousReadWait}; - std::tie(size, offset) = prefetch_future.get(); + auto result = prefetch_future.get(); + size = result.size; ProfileEvents::increment(ProfileEvents::AsynchronousReadWaitMicroseconds, watch.elapsedMicroseconds()); } diff --git a/src/IO/AsynchronousReader.h b/src/IO/AsynchronousReader.h index 157e6b9361c..e79e72f3bec 100644 --- a/src/IO/AsynchronousReader.h +++ b/src/IO/AsynchronousReader.h @@ -49,13 +49,18 @@ public: size_t ignore = 0; }; - /// 1. size - /// Less than requested amount of data can be returned. - /// If size is zero - the file has ended. - /// (for example, EINTR must be handled by implementation automatically) - /// 2. offset - /// Optional. Useful when implementation needs to do ignore(). - using Result = std::pair; + struct Result + { + /// size + /// Less than requested amount of data can be returned. + /// If size is zero - the file has ended. + /// (for example, EINTR must be handled by implementation automatically) + size_t size = 0; + + /// offset + /// Optional. Useful when implementation needs to do ignore(). + size_t offset = 0; + }; /// Submit request and obtain a handle. This method don't perform any waits. /// If this method did not throw, the caller must wait for the result with 'wait' method diff --git a/src/IO/SynchronousReader.cpp b/src/IO/SynchronousReader.cpp index d666d34fdcb..4414da28d28 100644 --- a/src/IO/SynchronousReader.cpp +++ b/src/IO/SynchronousReader.cpp @@ -82,7 +82,8 @@ std::future SynchronousReader::submit(Request reque watch.stop(); ProfileEvents::increment(ProfileEvents::DiskReadElapsedMicroseconds, watch.elapsedMicroseconds()); - return std::make_pair(bytes_read, static_cast(0)); + return Result{ .size = bytes_read, .offset = 0}; + }); } diff --git a/src/IO/ThreadPoolReader.cpp b/src/IO/ThreadPoolReader.cpp index 0041e881b25..63bc8fe7c49 100644 --- a/src/IO/ThreadPoolReader.cpp +++ b/src/IO/ThreadPoolReader.cpp @@ -219,7 +219,7 @@ std::future ThreadPoolReader::submit(Request reques ProfileEvents::increment(ProfileEvents::ThreadPoolReaderPageCacheMissElapsedMicroseconds, watch.elapsedMicroseconds()); ProfileEvents::increment(ProfileEvents::DiskReadElapsedMicroseconds, watch.elapsedMicroseconds()); - return std::make_pair(bytes_read, static_cast(0)); + return Result{ .size = bytes_read, .offset = 0 }; }); auto future = task->get_future(); diff --git a/src/Storages/MergeTree/MergeTreeReaderCompact.cpp b/src/Storages/MergeTree/MergeTreeReaderCompact.cpp index 1f8642db886..4dd76673aef 100644 --- a/src/Storages/MergeTree/MergeTreeReaderCompact.cpp +++ b/src/Storages/MergeTree/MergeTreeReaderCompact.cpp @@ -199,6 +199,8 @@ void MergeTreeReaderCompact::readData( { const auto & [name, type] = name_and_type; + adjustUpperBound(current_task_last_mark); /// Must go before seek. + if (!isContinuousReading(from_mark, column_position)) seekToMark(from_mark, column_position); @@ -207,8 +209,6 @@ void MergeTreeReaderCompact::readData( if (only_offsets && (substream_path.size() != 1 || substream_path[0].type != ISerialization::Substream::ArraySizes)) return nullptr; - /// For asynchronous reading from remote fs. - data_buffer->setReadUntilPosition(marks_loader.getMark(current_task_last_mark).offset_in_compressed_file); return data_buffer; }; @@ -269,6 +269,34 @@ void MergeTreeReaderCompact::seekToMark(size_t row_index, size_t column_index) } } +void MergeTreeReaderCompact::adjustUpperBound(size_t last_mark) +{ + auto right_offset = marks_loader.getMark(last_mark).offset_in_compressed_file; + if (!right_offset) + { + /// If already reading till the end of file. + if (last_right_offset && *last_right_offset == 0) + return; + + last_right_offset = 0; // Zero value means the end of file. + if (cached_buffer) + cached_buffer->setReadUntilEnd(); + if (non_cached_buffer) + non_cached_buffer->setReadUntilEnd(); + } + else + { + if (last_right_offset && right_offset <= last_right_offset.value()) + return; + + last_right_offset = right_offset; + if (cached_buffer) + cached_buffer->setReadUntilPosition(right_offset); + if (non_cached_buffer) + non_cached_buffer->setReadUntilPosition(right_offset); + } +} + bool MergeTreeReaderCompact::isContinuousReading(size_t mark, size_t column_position) { if (!last_read_granule) diff --git a/src/Storages/MergeTree/MergeTreeReaderCompact.h b/src/Storages/MergeTree/MergeTreeReaderCompact.h index 350c8427eff..381b212df3c 100644 --- a/src/Storages/MergeTree/MergeTreeReaderCompact.h +++ b/src/Storages/MergeTree/MergeTreeReaderCompact.h @@ -52,6 +52,9 @@ private: /// Should we read full column or only it's offsets std::vector read_only_offsets; + /// For asynchronous reading from remote fs. Same meaning as in MergeTreeReaderStream. + std::optional last_right_offset; + size_t next_mark = 0; std::optional> last_read_granule; @@ -67,6 +70,9 @@ private: MergeTreeMarksLoader & marks_loader, const ColumnPositions & column_positions, const MarkRanges & mark_ranges); + + /// For asynchronous reading from remote fs. + void adjustUpperBound(size_t last_mark); }; } diff --git a/tests/integration/test_merge_tree_s3/test.py b/tests/integration/test_merge_tree_s3/test.py index 46338b4316c..766bb3bb159 100644 --- a/tests/integration/test_merge_tree_s3/test.py +++ b/tests/integration/test_merge_tree_s3/test.py @@ -463,5 +463,5 @@ def test_lazy_seek_optimization_for_async_read(cluster, node_name): node = cluster.instances[node_name] node.query("DROP TABLE IF EXISTS test") node.query("CREATE TABLE test (key UInt32, value String) Engine=MergeTree() ORDER BY key SETTINGS storage_policy='s3';") - node.query("INSERT INTO test SELECT * FROM generateRandom('key UInt32, value String') LIMIT 100000000") - assert int(node.query("SELECT count() FROM test")) == 100000000 + node.query("INSERT INTO test SELECT * FROM generateRandom('key UInt32, value String') LIMIT 20000") + node.query("SELECT * FROM test WHERE value LIKE '%abc%' ORDER BY value LIMIT 10") diff --git a/tests/integration/test_merge_tree_s3_failover/test.py b/tests/integration/test_merge_tree_s3_failover/test.py index b6b47417523..44e7e0ae5ad 100644 --- a/tests/integration/test_merge_tree_s3_failover/test.py +++ b/tests/integration/test_merge_tree_s3_failover/test.py @@ -37,7 +37,6 @@ def fail_request(cluster, request): ["curl", "-s", "http://resolver:8080/fail_request/{}".format(request)]) assert response == 'OK', 'Expected "OK", but got "{}"'.format(response) - def throttle_request(cluster, request): response = cluster.exec_in_container(cluster.get_container_id('resolver'), ["curl", "-s", "http://resolver:8080/throttle_request/{}".format(request)]) diff --git a/tests/integration/test_merge_tree_s3_restore/test.py b/tests/integration/test_merge_tree_s3_restore/test.py index babbea2beba..e12b69cdf17 100644 --- a/tests/integration/test_merge_tree_s3_restore/test.py +++ b/tests/integration/test_merge_tree_s3_restore/test.py @@ -7,6 +7,7 @@ import time import pytest from helpers.cluster import ClickHouseCluster, get_instances_dir + SCRIPT_DIR = os.path.dirname(os.path.realpath(__file__)) NOT_RESTORABLE_CONFIG_PATH = os.path.join(SCRIPT_DIR, './{}/node_not_restorable/configs/config.d/storage_conf_not_restorable.xml'.format(get_instances_dir())) COMMON_CONFIGS = ["configs/config.d/bg_processing_pool_conf.xml", "configs/config.d/clusters.xml"] diff --git a/tests/integration/test_merge_tree_s3_with_cache/test.py b/tests/integration/test_merge_tree_s3_with_cache/test.py index e15eaf61812..be3d2709873 100644 --- a/tests/integration/test_merge_tree_s3_with_cache/test.py +++ b/tests/integration/test_merge_tree_s3_with_cache/test.py @@ -36,7 +36,6 @@ def get_query_stat(instance, hint): result[ev[0]] = int(ev[1]) return result - @pytest.mark.parametrize("min_rows_for_wide_part,read_requests", [(0, 2), (8192, 1)]) def test_write_is_cached(cluster, min_rows_for_wide_part, read_requests): node = cluster.instances["node"] diff --git a/tests/integration/test_replicated_merge_tree_s3_zero_copy/test.py b/tests/integration/test_replicated_merge_tree_s3_zero_copy/test.py index 793abc53566..edf39969b47 100644 --- a/tests/integration/test_replicated_merge_tree_s3_zero_copy/test.py +++ b/tests/integration/test_replicated_merge_tree_s3_zero_copy/test.py @@ -65,7 +65,6 @@ def create_table(cluster, additional_settings=None): list(cluster.instances.values())[0].query(create_table_statement) - @pytest.fixture(autouse=True) def drop_table(cluster): yield From 0411fd656537290c1ef435389abaa3e552b28b37 Mon Sep 17 00:00:00 2001 From: kssenii Date: Sun, 19 Dec 2021 21:24:52 +0300 Subject: [PATCH 04/28] Better test --- tests/integration/test_merge_tree_s3/test.py | 12 ++++++++---- 1 file changed, 8 insertions(+), 4 deletions(-) diff --git a/tests/integration/test_merge_tree_s3/test.py b/tests/integration/test_merge_tree_s3/test.py index 766bb3bb159..9e2e9a4b58b 100644 --- a/tests/integration/test_merge_tree_s3/test.py +++ b/tests/integration/test_merge_tree_s3/test.py @@ -461,7 +461,11 @@ def test_s3_disk_reads_on_unstable_connection(cluster, node_name): @pytest.mark.parametrize("node_name", ["node"]) def test_lazy_seek_optimization_for_async_read(cluster, node_name): node = cluster.instances[node_name] - node.query("DROP TABLE IF EXISTS test") - node.query("CREATE TABLE test (key UInt32, value String) Engine=MergeTree() ORDER BY key SETTINGS storage_policy='s3';") - node.query("INSERT INTO test SELECT * FROM generateRandom('key UInt32, value String') LIMIT 20000") - node.query("SELECT * FROM test WHERE value LIKE '%abc%' ORDER BY value LIMIT 10") + node.query("DROP TABLE IF EXISTS s3_test NO DELAY") + node.query("CREATE TABLE s3_test (key UInt32, value String) Engine=MergeTree() ORDER BY key SETTINGS storage_policy='s3';") + node.query("INSERT INTO s3_test SELECT * FROM generateRandom('key UInt32, value String') LIMIT 9000000") + node.query("SELECT * FROM s3_test WHERE value LIKE '%abc%' ORDER BY value LIMIT 10") + node.query("DROP TABLE IF EXISTS s3_test NO DELAY") + minio = cluster.minio_client + for obj in list(minio.list_objects(cluster.minio_bucket, 'data/')): + minio.remove_object(cluster.minio_bucket, obj.object_name) From 965e2bfb219653ada65f3135ea353acef58729aa Mon Sep 17 00:00:00 2001 From: kssenii Date: Mon, 20 Dec 2021 08:43:07 +0300 Subject: [PATCH 05/28] Fix --- src/Interpreters/ExpressionAnalyzer.cpp | 12 ++++++++++++ .../02006_test_positional_arguments.reference | 2 ++ .../0_stateless/02006_test_positional_arguments.sql | 2 ++ 3 files changed, 16 insertions(+) diff --git a/src/Interpreters/ExpressionAnalyzer.cpp b/src/Interpreters/ExpressionAnalyzer.cpp index 9b343bec055..935f47f944c 100644 --- a/src/Interpreters/ExpressionAnalyzer.cpp +++ b/src/Interpreters/ExpressionAnalyzer.cpp @@ -183,6 +183,18 @@ bool checkPositionalArguments(ASTPtr & argument, const ASTSelectQuery * select_q { if (ast_function->arguments) { + /// Check that all literal arguments are integers in advance to be able to always + /// throw exception at once if argument is out of position bounds. + for (auto & arg : ast_function->arguments->children) + { + if (const auto * ast_literal = typeid_cast(arg.get())) + { + auto which = ast_literal->value.getType(); + if (which != Field::Types::UInt64) + return false; + } + } + for (auto & arg : ast_function->arguments->children) positional &= checkPositionalArguments(arg, select_query, expression); } diff --git a/tests/queries/0_stateless/02006_test_positional_arguments.reference b/tests/queries/0_stateless/02006_test_positional_arguments.reference index 7b75ab43430..dd4927c2e15 100644 --- a/tests/queries/0_stateless/02006_test_positional_arguments.reference +++ b/tests/queries/0_stateless/02006_test_positional_arguments.reference @@ -152,3 +152,5 @@ SELECT 1 + 1 AS a GROUP BY a select substr('aaaaaaaaaaaaaa', 8) as a group by a; aaaaaaa +select substr('aaaaaaaaaaaaaa', 8) as a group by substr('aaaaaaaaaaaaaa', 8); +aaaaaaa diff --git a/tests/queries/0_stateless/02006_test_positional_arguments.sql b/tests/queries/0_stateless/02006_test_positional_arguments.sql index 3ba01b47efa..3dbbd42987c 100644 --- a/tests/queries/0_stateless/02006_test_positional_arguments.sql +++ b/tests/queries/0_stateless/02006_test_positional_arguments.sql @@ -52,3 +52,5 @@ select a, b, c, d, e, f from (select 44 a, 88 b, 13 c, 14 d, 15 e, 16 f) t grou explain syntax select plus(1, 1) as a group by a; select substr('aaaaaaaaaaaaaa', 8) as a group by a; +select substr('aaaaaaaaaaaaaa', 8) as a group by substr('aaaaaaaaaaaaaa', 8); + From 62eb504029bf033f6772c81a928991d572c2d989 Mon Sep 17 00:00:00 2001 From: alesapin Date: Mon, 20 Dec 2021 22:43:37 +0300 Subject: [PATCH 06/28] Trying to add woboq --- .github/workflows/main.yml | 32 ++++++++++++ docker/test/codebrowser/Dockerfile | 2 +- tests/ci/codebrowser_check.py | 78 ++++++++++++++++++++++++++++++ tests/ci/s3_helper.py | 45 +++++++++++++++++ 4 files changed, 156 insertions(+), 1 deletion(-) create mode 100644 tests/ci/codebrowser_check.py diff --git a/.github/workflows/main.yml b/.github/workflows/main.yml index 4cda4eac33e..3f1ab153719 100644 --- a/.github/workflows/main.yml +++ b/.github/workflows/main.yml @@ -49,6 +49,38 @@ jobs: with: name: changed_images path: ${{ runner.temp }}/docker_images_check/changed_images.json + WoboqCodebrowser: + needs: CheckLabels + runs-on: [self-hosted, style-checker] + steps: + - name: Set envs + run: | + cat >> "$GITHUB_ENV" << 'EOF' + TEMP_PATH=${{runner.temp}}/codebrowser + REPO_COPY=${{runner.temp}}/codebrowser/ClickHouse + EOF + - name: Clear repository + run: | + sudo rm -fr $GITHUB_WORKSPACE && mkdir $GITHUB_WORKSPACE + - name: Check out repository code + uses: actions/checkout@v2 + - name: Download changed images + uses: actions/download-artifact@v2 + with: + name: changed_images + path: ${{ env.TEMP_PATH }} + - name: Codebrowser + run: | + sudo rm -fr $TEMP_PATH + mkdir -p $TEMP_PATH + cp -r $GITHUB_WORKSPACE $TEMP_PATH + cd $REPO_COPY/tests/ci && python3 codebrowser_check.py + - name: Cleanup + if: always() + run: | + docker kill $(docker ps -q) ||: + docker rm -f $(docker ps -a -q) ||: + sudo rm -fr $TEMP_PATH StyleCheck: needs: DockerHubPush runs-on: [self-hosted, style-checker] diff --git a/docker/test/codebrowser/Dockerfile b/docker/test/codebrowser/Dockerfile index 25fabca67b5..dea102b3e4e 100644 --- a/docker/test/codebrowser/Dockerfile +++ b/docker/test/codebrowser/Dockerfile @@ -6,7 +6,7 @@ FROM clickhouse/binary-builder ARG apt_archive="http://archive.ubuntu.com" RUN sed -i "s|http://archive.ubuntu.com|$apt_archive|g" /etc/apt/sources.list -RUN apt-get update && apt-get --yes --allow-unauthenticated install clang-9 libllvm9 libclang-9-dev +RUN apt-get update && apt-get --yes --allow-unauthenticated install clang-13 libllvm13 libclang-13-dev # repo versions doesn't work correctly with C++17 # also we push reports to s3, so we add index.html to subfolder urls diff --git a/tests/ci/codebrowser_check.py b/tests/ci/codebrowser_check.py new file mode 100644 index 00000000000..3d4b4993a90 --- /dev/null +++ b/tests/ci/codebrowser_check.py @@ -0,0 +1,78 @@ +#!/usr/bin/env python3 + + +import os +import subprocess +import logging + +from github import Github + +from stopwatch import Stopwatch +from upload_result_helper import upload_results +from s3_helper import S3Helper +from get_robot_token import get_best_robot_token +from pr_info import PRInfo +from commit_status_helper import post_commit_status +from docker_pull_helper import get_image_with_version +from tee_popen import TeePopen + +NAME = "Woboq Build (actions)" + +def get_run_command(repo_path, output_path, image): + cmd = "docker run " + \ + f"--volume={repo_path}:/repo_path " \ + f"--volume={output_path}:/test_output " \ + f"-e DATA=https://s3.amazonaws.com/clickhouse-test-reports/codebrowser/html_report/data {image}" + return cmd + +if __name__ == "__main__": + logging.basicConfig(level=logging.INFO) + + stopwatch = Stopwatch() + + temp_path = os.getenv("TEMP_PATH", os.path.abspath(".")) + repo_path = os.getenv("REPO_COPY", os.path.abspath("../../")) + + pr_info = PRInfo() + + gh = Github(get_best_robot_token()) + + if not os.path.exists(temp_path): + os.makedirs(temp_path) + + docker_image = get_image_with_version(temp_path, 'clickhouse/codebrowser') + s3_helper = S3Helper('https://s3.amazonaws.com') + + result_path = os.path.join(temp_path, "result_path") + if not os.path.exists(result_path): + os.makedirs(result_path) + + run_command = get_run_command(repo_path, result_path, docker_image) + + logging.info("Going to run codebrowser: %s", run_command) + + run_log_path = os.path.join(temp_path, "runlog.log") + + with TeePopen(run_command, run_log_path) as process: + retcode = process.wait() + if retcode == 0: + logging.info("Run successfully") + else: + logging.info("Run failed") + + subprocess.check_call(f"sudo chown -R ubuntu:ubuntu {temp_path}", shell=True) + + report_path = os.path.join(result_path, "html_report") + logging.info("Report path %s", report_path) + s3_path_prefix = "codebrowser" + html_urls = s3_helper.fast_parallel_upload_dir(report_path, s3_path_prefix, 'clickhouse-test-reports') + + index_html = 'HTML report' + + test_results = [(index_html, "Look at the report")] + + report_url = upload_results(s3_helper, pr_info.number, pr_info.sha, test_results, [], NAME) + + print(f"::notice ::Report url: {report_url}") + + post_commit_status(gh, pr_info.sha, NAME, "Report built", "success", report_url) diff --git a/tests/ci/s3_helper.py b/tests/ci/s3_helper.py index 27a613f7787..57225dee798 100644 --- a/tests/ci/s3_helper.py +++ b/tests/ci/s3_helper.py @@ -4,6 +4,7 @@ import logging import os import re import shutil +import time from multiprocessing.dummy import Pool import boto3 @@ -83,6 +84,50 @@ class S3Helper: else: return S3Helper.copy_file_to_local(S3_BUILDS_BUCKET, file_path, s3_path) + def fast_parallel_upload_dir(self, dir_path, s3_dir_path, bucket_name): + all_files = [] + + for root, _, files in os.walk(dir_path): + for file in files: + all_files.append(os.path.join(root, file)) + + logging.info("Files found %s", len(all_files)) + + counter = 0 + t = time.time() + sum_time = 0 + def upload_task(file_path): + nonlocal counter + nonlocal t + nonlocal sum_time + try: + s3_path = file_path.replace(dir_path, s3_dir_path) + # Retry + for i in range(5): + try: + self.client.upload_file(file_path, bucket_name, s3_path) + break + except Exception as ex: + if i == 4: + raise ex + time.sleep(0.1 * i) + + counter += 1 + if counter % 1000 == 0: + sum_time += int(time.time() - t) + print("Uploaded", counter, "-", int(time.time() - t), "s", "sum time", sum_time, "s") + t = time.time() + except Exception as ex: + logging.critical("Failed to upload file, expcetion %s", ex) + return "https://s3.amazonaws.com/{bucket}/{path}".format(bucket=bucket_name, path=s3_path) + + p = Pool(256) + + logging.basicConfig(level=logging.CRITICAL) + result = sorted(_flatten_list(p.map(upload_task, all_files))) + logging.basicConfig(level=logging.INFO) + return result + def _upload_folder_to_s3(self, folder_path, s3_folder_path, bucket_name, keep_dirs_in_s3_path, upload_symlinks): logging.info("Upload folder '%s' to bucket=%s of s3 folder '%s'", folder_path, bucket_name, s3_folder_path) if not os.path.exists(folder_path): From a962cc67da04351af2f86de235b066a4a86ae5cc Mon Sep 17 00:00:00 2001 From: alesapin Date: Mon, 20 Dec 2021 22:45:30 +0300 Subject: [PATCH 07/28] Fix deps --- .github/workflows/main.yml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.github/workflows/main.yml b/.github/workflows/main.yml index 3f1ab153719..d1f05b94025 100644 --- a/.github/workflows/main.yml +++ b/.github/workflows/main.yml @@ -50,7 +50,7 @@ jobs: name: changed_images path: ${{ runner.temp }}/docker_images_check/changed_images.json WoboqCodebrowser: - needs: CheckLabels + needs: DockerHubPush runs-on: [self-hosted, style-checker] steps: - name: Set envs From e194b38096d20a6157d8bdb5f1dfcb5f3153c548 Mon Sep 17 00:00:00 2001 From: alesapin Date: Mon, 20 Dec 2021 22:54:04 +0300 Subject: [PATCH 08/28] Followup --- .github/workflows/main.yml | 2 ++ tests/ci/codebrowser_check.py | 2 +- 2 files changed, 3 insertions(+), 1 deletion(-) diff --git a/.github/workflows/main.yml b/.github/workflows/main.yml index d1f05b94025..09279db321e 100644 --- a/.github/workflows/main.yml +++ b/.github/workflows/main.yml @@ -64,6 +64,8 @@ jobs: sudo rm -fr $GITHUB_WORKSPACE && mkdir $GITHUB_WORKSPACE - name: Check out repository code uses: actions/checkout@v2 + with: + submodules: 'true' - name: Download changed images uses: actions/download-artifact@v2 with: diff --git a/tests/ci/codebrowser_check.py b/tests/ci/codebrowser_check.py index 3d4b4993a90..0207826df89 100644 --- a/tests/ci/codebrowser_check.py +++ b/tests/ci/codebrowser_check.py @@ -20,7 +20,7 @@ NAME = "Woboq Build (actions)" def get_run_command(repo_path, output_path, image): cmd = "docker run " + \ - f"--volume={repo_path}:/repo_path " \ + f"--volume={repo_path}:/repo_folder " \ f"--volume={output_path}:/test_output " \ f"-e DATA=https://s3.amazonaws.com/clickhouse-test-reports/codebrowser/html_report/data {image}" return cmd From a064410a3969bf31ec3095d68472bc9a6955237d Mon Sep 17 00:00:00 2001 From: kssenii Date: Tue, 21 Dec 2021 00:03:43 +0300 Subject: [PATCH 09/28] Fix --- tests/integration/test_storage_rabbitmq/test.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/tests/integration/test_storage_rabbitmq/test.py b/tests/integration/test_storage_rabbitmq/test.py index 5342473aefa..01f13bfcd42 100644 --- a/tests/integration/test_storage_rabbitmq/test.py +++ b/tests/integration/test_storage_rabbitmq/test.py @@ -67,8 +67,8 @@ def rabbitmq_cluster(): def rabbitmq_setup_teardown(): print("RabbitMQ is available - running test") yield # run test - for table_name in ['view', 'consumer', 'rabbitmq']: - instance.query(f'DROP TABLE IF EXISTS test.{table_name}') + instance.query(f'DROP DATABASE test') + instance.query('CREATE DATABASE test') # Tests From d51b10124400d8a24e785a94bda2d259f66873d9 Mon Sep 17 00:00:00 2001 From: Kseniia Sumarokova <54203879+kssenii@users.noreply.github.com> Date: Tue, 21 Dec 2021 00:06:24 +0300 Subject: [PATCH 10/28] Update test.py --- tests/integration/test_storage_rabbitmq/test.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/integration/test_storage_rabbitmq/test.py b/tests/integration/test_storage_rabbitmq/test.py index 01f13bfcd42..69c3ed944c0 100644 --- a/tests/integration/test_storage_rabbitmq/test.py +++ b/tests/integration/test_storage_rabbitmq/test.py @@ -67,7 +67,7 @@ def rabbitmq_cluster(): def rabbitmq_setup_teardown(): print("RabbitMQ is available - running test") yield # run test - instance.query(f'DROP DATABASE test') + instance.query('DROP DATABASE test') instance.query('CREATE DATABASE test') From 175ad8a9891627d655ab876e924ba481b06f70a0 Mon Sep 17 00:00:00 2001 From: kssenii Date: Tue, 21 Dec 2021 00:20:08 +0300 Subject: [PATCH 11/28] Fix --- src/Interpreters/ExpressionAnalyzer.cpp | 27 ++------------ .../02006_test_positional_arguments.reference | 37 ------------------- .../02006_test_positional_arguments.sql | 10 ----- 3 files changed, 3 insertions(+), 71 deletions(-) diff --git a/src/Interpreters/ExpressionAnalyzer.cpp b/src/Interpreters/ExpressionAnalyzer.cpp index 935f47f944c..9e7645b51f8 100644 --- a/src/Interpreters/ExpressionAnalyzer.cpp +++ b/src/Interpreters/ExpressionAnalyzer.cpp @@ -117,7 +117,6 @@ bool checkPositionalArguments(ASTPtr & argument, const ASTSelectQuery * select_q /// In case of expression/function (order by 1+2 and 2*x1, greatest(1, 2)) replace /// positions only if all literals are numbers, otherwise it is not positional. - bool positional = true; /// Case when GROUP BY element is position. if (const auto * ast_literal = typeid_cast(argument.get())) @@ -177,32 +176,12 @@ bool checkPositionalArguments(ASTPtr & argument, const ASTSelectQuery * select_q } } else - positional = false; - } - else if (const auto * ast_function = typeid_cast(argument.get())) - { - if (ast_function->arguments) - { - /// Check that all literal arguments are integers in advance to be able to always - /// throw exception at once if argument is out of position bounds. - for (auto & arg : ast_function->arguments->children) - { - if (const auto * ast_literal = typeid_cast(arg.get())) - { - auto which = ast_literal->value.getType(); - if (which != Field::Types::UInt64) - return false; - } - } - - for (auto & arg : ast_function->arguments->children) - positional &= checkPositionalArguments(arg, select_query, expression); - } + return false; } else - positional = false; + return false; - return positional; + return true; } void replaceForPositionalArguments(ASTPtr & argument, const ASTSelectQuery * select_query, ASTSelectQuery::Expression expression) diff --git a/tests/queries/0_stateless/02006_test_positional_arguments.reference b/tests/queries/0_stateless/02006_test_positional_arguments.reference index dd4927c2e15..5fc070ffd0b 100644 --- a/tests/queries/0_stateless/02006_test_positional_arguments.reference +++ b/tests/queries/0_stateless/02006_test_positional_arguments.reference @@ -46,22 +46,6 @@ select x1, x2, x3 from test order by 3 limit 1 by 1; 100 100 1 10 1 10 1 10 100 -explain syntax select x3, x2, x1 from test order by 1 + 1; -SELECT - x3, - x2, - x1 -FROM test -ORDER BY x3 + x3 ASC -explain syntax select x3, x2, x1 from test order by (1 + 1) * 3; -SELECT - x3, - x2, - x1 -FROM test -ORDER BY (x3 + x3) * x1 ASC -select x2, x1 from test group by x2 + x1; -- { serverError 215 } -select x2, x1 from test group by 1 + 2; -- { serverError 215 } explain syntax select x3, x2, x1 from test order by 1; SELECT x3, @@ -110,27 +94,6 @@ GROUP BY x2 select max(x1), x2 from test group by 1, 2; -- { serverError 43 } select 1 + max(x1), x2 from test group by 1, 2; -- { serverError 43 } -select x1 + x2, x3 from test group by x1 + x2, x3; -11 100 -200 1 -11 200 -11 10 -select x3, x2, x1 from test order by x3 * 2, x2, x1; -- check x3 * 2 does not become x3 * x2 -1 100 100 -1 100 100 -10 1 10 -100 10 1 -200 1 10 -200 10 1 -explain syntax select x1, x3 from test group by 1 + 2, 1, 2; -SELECT - x1, - x3 -FROM test -GROUP BY - x1 + x3, - x1, - x3 explain syntax select x1 + x3, x3 from test group by 1, 2; SELECT x1 + x3, diff --git a/tests/queries/0_stateless/02006_test_positional_arguments.sql b/tests/queries/0_stateless/02006_test_positional_arguments.sql index 3dbbd42987c..3a2cf76f6c4 100644 --- a/tests/queries/0_stateless/02006_test_positional_arguments.sql +++ b/tests/queries/0_stateless/02006_test_positional_arguments.sql @@ -22,12 +22,6 @@ select x1, x2, x3 from test order by 3 limit 1 by 3; select x1, x2, x3 from test order by x3 limit 1 by x1; select x1, x2, x3 from test order by 3 limit 1 by 1; -explain syntax select x3, x2, x1 from test order by 1 + 1; -explain syntax select x3, x2, x1 from test order by (1 + 1) * 3; - -select x2, x1 from test group by x2 + x1; -- { serverError 215 } -select x2, x1 from test group by 1 + 2; -- { serverError 215 } - explain syntax select x3, x2, x1 from test order by 1; explain syntax select x3 + 1, x2, x1 from test order by 1; explain syntax select x3, x3 - x2, x2, x1 from test order by 2; @@ -37,11 +31,7 @@ explain syntax select 1 + greatest(x1, 1), x2 from test group by 1, 2; select max(x1), x2 from test group by 1, 2; -- { serverError 43 } select 1 + max(x1), x2 from test group by 1, 2; -- { serverError 43 } -select x1 + x2, x3 from test group by x1 + x2, x3; -select x3, x2, x1 from test order by x3 * 2, x2, x1; -- check x3 * 2 does not become x3 * x2 - -explain syntax select x1, x3 from test group by 1 + 2, 1, 2; explain syntax select x1 + x3, x3 from test group by 1, 2; create table test2(x1 Int, x2 Int, x3 Int) engine=Memory; From 7a195708533cfd70e649219a1ec6cf58d368e29b Mon Sep 17 00:00:00 2001 From: JackyWoo Date: Tue, 21 Dec 2021 11:43:14 +0800 Subject: [PATCH 12/28] keeper handler should remove operation when response sent --- src/Server/KeeperTCPHandler.cpp | 2 ++ 1 file changed, 2 insertions(+) diff --git a/src/Server/KeeperTCPHandler.cpp b/src/Server/KeeperTCPHandler.cpp index 67abd6db13a..0c5d7d93689 100644 --- a/src/Server/KeeperTCPHandler.cpp +++ b/src/Server/KeeperTCPHandler.cpp @@ -557,6 +557,8 @@ void KeeperTCPHandler::updateStats(Coordination::ZooKeeperResponsePtr & response std::lock_guard lock(conn_stats_mutex); conn_stats.updateLatency(elapsed); } + + operations.erase(response->xid); keeper_dispatcher->updateKeeperStatLatency(elapsed); last_op.set(std::make_unique(LastOp{ From e0f2fe457df79321a92f4c922bed91c7a913c34a Mon Sep 17 00:00:00 2001 From: Kseniia Sumarokova <54203879+kssenii@users.noreply.github.com> Date: Tue, 21 Dec 2021 10:29:27 +0300 Subject: [PATCH 13/28] Update test.py --- tests/integration/test_storage_rabbitmq/test.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/integration/test_storage_rabbitmq/test.py b/tests/integration/test_storage_rabbitmq/test.py index 69c3ed944c0..2c2a9e41509 100644 --- a/tests/integration/test_storage_rabbitmq/test.py +++ b/tests/integration/test_storage_rabbitmq/test.py @@ -67,7 +67,7 @@ def rabbitmq_cluster(): def rabbitmq_setup_teardown(): print("RabbitMQ is available - running test") yield # run test - instance.query('DROP DATABASE test') + instance.query('DROP DATABASE test NO DELAY') instance.query('CREATE DATABASE test') From 532b01c3718262b8608c15918e41c7e30c006fff Mon Sep 17 00:00:00 2001 From: alesapin Date: Tue, 21 Dec 2021 10:49:22 +0300 Subject: [PATCH 14/28] Followup --- .github/workflows/main.yml | 3 ++- docker/test/codebrowser/Dockerfile | 4 ++-- tests/ci/codebrowser_check.py | 8 ++++---- tests/ci/s3_helper.py | 10 +++++++++- 4 files changed, 17 insertions(+), 8 deletions(-) diff --git a/.github/workflows/main.yml b/.github/workflows/main.yml index 09279db321e..d08009b96f9 100644 --- a/.github/workflows/main.yml +++ b/.github/workflows/main.yml @@ -58,6 +58,7 @@ jobs: cat >> "$GITHUB_ENV" << 'EOF' TEMP_PATH=${{runner.temp}}/codebrowser REPO_COPY=${{runner.temp}}/codebrowser/ClickHouse + IMAGES_PATH=${{runner.temp}}/images_path EOF - name: Clear repository run: | @@ -70,7 +71,7 @@ jobs: uses: actions/download-artifact@v2 with: name: changed_images - path: ${{ env.TEMP_PATH }} + path: ${{ env.IMAGES_PATH }} - name: Codebrowser run: | sudo rm -fr $TEMP_PATH diff --git a/docker/test/codebrowser/Dockerfile b/docker/test/codebrowser/Dockerfile index dea102b3e4e..6ffe94beff4 100644 --- a/docker/test/codebrowser/Dockerfile +++ b/docker/test/codebrowser/Dockerfile @@ -23,12 +23,12 @@ ENV SOURCE_DIRECTORY=/repo_folder ENV BUILD_DIRECTORY=/build ENV HTML_RESULT_DIRECTORY=$BUILD_DIRECTORY/html_report ENV SHA=nosha -ENV DATA="data" +ENV DATA="https://s3.amazonaws.com/clickhouse-test-reports/codebrowser/html_report/data" CMD mkdir -p $BUILD_DIRECTORY && cd $BUILD_DIRECTORY && \ cmake $SOURCE_DIRECTORY -DCMAKE_CXX_COMPILER=/usr/bin/clang\+\+-13 -DCMAKE_C_COMPILER=/usr/bin/clang-13 -DCMAKE_EXPORT_COMPILE_COMMANDS=ON -DENABLE_EMBEDDED_COMPILER=0 -DENABLE_S3=0 && \ mkdir -p $HTML_RESULT_DIRECTORY && \ $CODEGEN -b $BUILD_DIRECTORY -a -o $HTML_RESULT_DIRECTORY -p ClickHouse:$SOURCE_DIRECTORY:$SHA -d $DATA | ts '%Y-%m-%d %H:%M:%S' && \ cp -r $STATIC_DATA $HTML_RESULT_DIRECTORY/ &&\ - $CODEINDEX $HTML_RESULT_DIRECTORY -d $DATA | ts '%Y-%m-%d %H:%M:%S' && \ + $CODEINDEX $HTML_RESULT_DIRECTORY -d "$DATA" | ts '%Y-%m-%d %H:%M:%S' && \ mv $HTML_RESULT_DIRECTORY /test_output diff --git a/tests/ci/codebrowser_check.py b/tests/ci/codebrowser_check.py index 0207826df89..bc3b57c2d91 100644 --- a/tests/ci/codebrowser_check.py +++ b/tests/ci/codebrowser_check.py @@ -7,6 +7,7 @@ import logging from github import Github +from env_helper import IMAGES_PATH, REPO_COPY from stopwatch import Stopwatch from upload_result_helper import upload_results from s3_helper import S3Helper @@ -22,7 +23,7 @@ def get_run_command(repo_path, output_path, image): cmd = "docker run " + \ f"--volume={repo_path}:/repo_folder " \ f"--volume={output_path}:/test_output " \ - f"-e DATA=https://s3.amazonaws.com/clickhouse-test-reports/codebrowser/html_report/data {image}" + f"-e 'DATA=https://s3.amazonaws.com/clickhouse-test-reports/codebrowser/html_report/data' {image}" return cmd if __name__ == "__main__": @@ -31,7 +32,6 @@ if __name__ == "__main__": stopwatch = Stopwatch() temp_path = os.getenv("TEMP_PATH", os.path.abspath(".")) - repo_path = os.getenv("REPO_COPY", os.path.abspath("../../")) pr_info = PRInfo() @@ -40,14 +40,14 @@ if __name__ == "__main__": if not os.path.exists(temp_path): os.makedirs(temp_path) - docker_image = get_image_with_version(temp_path, 'clickhouse/codebrowser') + docker_image = get_image_with_version(IMAGES_PATH, 'clickhouse/codebrowser') s3_helper = S3Helper('https://s3.amazonaws.com') result_path = os.path.join(temp_path, "result_path") if not os.path.exists(result_path): os.makedirs(result_path) - run_command = get_run_command(repo_path, result_path, docker_image) + run_command = get_run_command(REPO_COPY, result_path, docker_image) logging.info("Going to run codebrowser: %s", run_command) diff --git a/tests/ci/s3_helper.py b/tests/ci/s3_helper.py index 57225dee798..753f036a8d7 100644 --- a/tests/ci/s3_helper.py +++ b/tests/ci/s3_helper.py @@ -102,10 +102,18 @@ class S3Helper: nonlocal sum_time try: s3_path = file_path.replace(dir_path, s3_dir_path) + metadata = {} + if s3_path.endswith("html"): + metadata['ContentType'] = "text/html; charset=utf-8" + elif s3_path.endswith("css"): + metadata['ContentType'] = "text/css; charset=utf-8" + elif s3_path.endswith("js"): + metadata['ContentType'] = "text/javascript; charset=utf-8" + # Retry for i in range(5): try: - self.client.upload_file(file_path, bucket_name, s3_path) + self.client.upload_file(file_path, bucket_name, s3_path, ExtraArgs=metadata) break except Exception as ex: if i == 4: From ebcb66a361b47cb52d4455f0047c6f5aba1f55fb Mon Sep 17 00:00:00 2001 From: kssenii Date: Mon, 20 Dec 2021 17:17:02 +0300 Subject: [PATCH 15/28] Fix --- src/Disks/IO/AsynchronousReadIndirectBufferFromRemoteFS.cpp | 1 - src/Disks/IO/ReadBufferFromRemoteFSGather.h | 5 +++++ src/IO/AsynchronousReadBufferFromFileDescriptor.cpp | 2 +- src/Storages/MergeTree/MergeTreeReadPool.cpp | 4 +++- tests/integration/test_merge_tree_s3/test.py | 2 +- 5 files changed, 10 insertions(+), 4 deletions(-) diff --git a/src/Disks/IO/AsynchronousReadIndirectBufferFromRemoteFS.cpp b/src/Disks/IO/AsynchronousReadIndirectBufferFromRemoteFS.cpp index 9cadc9a08a7..c8484e6088d 100644 --- a/src/Disks/IO/AsynchronousReadIndirectBufferFromRemoteFS.cpp +++ b/src/Disks/IO/AsynchronousReadIndirectBufferFromRemoteFS.cpp @@ -257,7 +257,6 @@ off_t AsynchronousReadIndirectBufferFromRemoteFS::seek(off_t offset_, int whence { ProfileEvents::increment(ProfileEvents::RemoteFSLazySeeks); bytes_to_ignore = file_offset_of_buffer_end - file_offset_before_seek; - prefetch_future = readInto(prefetch_buffer.data(), prefetch_buffer.size()); } else { diff --git a/src/Disks/IO/ReadBufferFromRemoteFSGather.h b/src/Disks/IO/ReadBufferFromRemoteFSGather.h index 00c8fbec670..ae1b8d16f33 100644 --- a/src/Disks/IO/ReadBufferFromRemoteFSGather.h +++ b/src/Disks/IO/ReadBufferFromRemoteFSGather.h @@ -69,6 +69,11 @@ private: size_t file_offset_of_buffer_end = 0; + /** + * File: |___________________| + * Buffer: |~~~~~~~| + * file_offset_of_buffer_end: ^ + */ size_t bytes_to_ignore = 0; size_t read_until_position = 0; diff --git a/src/IO/AsynchronousReadBufferFromFileDescriptor.cpp b/src/IO/AsynchronousReadBufferFromFileDescriptor.cpp index 995ed443428..a27c9035c61 100644 --- a/src/IO/AsynchronousReadBufferFromFileDescriptor.cpp +++ b/src/IO/AsynchronousReadBufferFromFileDescriptor.cpp @@ -91,7 +91,7 @@ bool AsynchronousReadBufferFromFileDescriptor::nextImpl() { /// No pending request. Do synchronous read. - auto [size, offset] = readInto(memory.data(), memory.size()).get(); + auto [size, _] = readInto(memory.data(), memory.size()).get(); file_offset_of_buffer_end += size; if (size) diff --git a/src/Storages/MergeTree/MergeTreeReadPool.cpp b/src/Storages/MergeTree/MergeTreeReadPool.cpp index 09542c30636..c89affb5365 100644 --- a/src/Storages/MergeTree/MergeTreeReadPool.cpp +++ b/src/Storages/MergeTree/MergeTreeReadPool.cpp @@ -198,7 +198,9 @@ std::vector MergeTreeReadPool::fillPerPartInfo(const RangesInDataParts & for (const auto i : collections::range(0, parts.size())) { const auto & part = parts[i]; - is_part_on_remote_disk[i] = part.data_part->isStoredOnRemoteDisk(); + bool part_on_remote_disk = part.data_part->isStoredOnRemoteDisk(); + is_part_on_remote_disk[i] = part_on_remote_disk; + do_not_steal_tasks |= part_on_remote_disk; /// Read marks for every data part. size_t sum_marks = 0; diff --git a/tests/integration/test_merge_tree_s3/test.py b/tests/integration/test_merge_tree_s3/test.py index 9e2e9a4b58b..04981523432 100644 --- a/tests/integration/test_merge_tree_s3/test.py +++ b/tests/integration/test_merge_tree_s3/test.py @@ -463,7 +463,7 @@ def test_lazy_seek_optimization_for_async_read(cluster, node_name): node = cluster.instances[node_name] node.query("DROP TABLE IF EXISTS s3_test NO DELAY") node.query("CREATE TABLE s3_test (key UInt32, value String) Engine=MergeTree() ORDER BY key SETTINGS storage_policy='s3';") - node.query("INSERT INTO s3_test SELECT * FROM generateRandom('key UInt32, value String') LIMIT 9000000") + node.query("INSERT INTO s3_test SELECT * FROM generateRandom('key UInt32, value String') LIMIT 10000000") node.query("SELECT * FROM s3_test WHERE value LIKE '%abc%' ORDER BY value LIMIT 10") node.query("DROP TABLE IF EXISTS s3_test NO DELAY") minio = cluster.minio_client From 754785fee5c8b8f972d11db0f1539fbfa8e71821 Mon Sep 17 00:00:00 2001 From: alesapin Date: Tue, 21 Dec 2021 11:07:24 +0300 Subject: [PATCH 16/28] Better container --- src/Server/KeeperTCPHandler.h | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/Server/KeeperTCPHandler.h b/src/Server/KeeperTCPHandler.h index fb6541d1f53..f98b269b8be 100644 --- a/src/Server/KeeperTCPHandler.h +++ b/src/Server/KeeperTCPHandler.h @@ -93,7 +93,7 @@ private: Poco::Timestamp established; - using Operations = std::map; + using Operations = std::unordered_map; Operations operations; LastOpMultiVersion last_op; From a89a05c0dacdc89168b51d594280f1687ecd75be Mon Sep 17 00:00:00 2001 From: alesapin Date: Tue, 21 Dec 2021 14:16:21 +0300 Subject: [PATCH 17/28] Finally woboq is ready --- docker/test/codebrowser/Dockerfile | 2 +- tests/ci/codebrowser_check.py | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/docker/test/codebrowser/Dockerfile b/docker/test/codebrowser/Dockerfile index 6ffe94beff4..d1059b3dacc 100644 --- a/docker/test/codebrowser/Dockerfile +++ b/docker/test/codebrowser/Dockerfile @@ -23,7 +23,7 @@ ENV SOURCE_DIRECTORY=/repo_folder ENV BUILD_DIRECTORY=/build ENV HTML_RESULT_DIRECTORY=$BUILD_DIRECTORY/html_report ENV SHA=nosha -ENV DATA="https://s3.amazonaws.com/clickhouse-test-reports/codebrowser/html_report/data" +ENV DATA="https://s3.amazonaws.com/clickhouse-test-reports/codebrowser/data" CMD mkdir -p $BUILD_DIRECTORY && cd $BUILD_DIRECTORY && \ cmake $SOURCE_DIRECTORY -DCMAKE_CXX_COMPILER=/usr/bin/clang\+\+-13 -DCMAKE_C_COMPILER=/usr/bin/clang-13 -DCMAKE_EXPORT_COMPILE_COMMANDS=ON -DENABLE_EMBEDDED_COMPILER=0 -DENABLE_S3=0 && \ diff --git a/tests/ci/codebrowser_check.py b/tests/ci/codebrowser_check.py index bc3b57c2d91..1f82fc2c1c7 100644 --- a/tests/ci/codebrowser_check.py +++ b/tests/ci/codebrowser_check.py @@ -23,7 +23,7 @@ def get_run_command(repo_path, output_path, image): cmd = "docker run " + \ f"--volume={repo_path}:/repo_folder " \ f"--volume={output_path}:/test_output " \ - f"-e 'DATA=https://s3.amazonaws.com/clickhouse-test-reports/codebrowser/html_report/data' {image}" + f"-e 'DATA=https://s3.amazonaws.com/clickhouse-test-reports/codebrowser/data' {image}" return cmd if __name__ == "__main__": From 5929d65e9e59e10422b66d6c3af41bb97bebecfd Mon Sep 17 00:00:00 2001 From: alesapin Date: Tue, 21 Dec 2021 14:22:47 +0300 Subject: [PATCH 18/28] Add regular job --- .github/workflows/woboq.yml | 41 +++++++++++++++++++++++++++++++++++++ 1 file changed, 41 insertions(+) create mode 100644 .github/workflows/woboq.yml diff --git a/.github/workflows/woboq.yml b/.github/workflows/woboq.yml new file mode 100644 index 00000000000..a513c2555ec --- /dev/null +++ b/.github/workflows/woboq.yml @@ -0,0 +1,41 @@ +name: WoboqBuilder +env: + # Force the stdout and stderr streams to be unbuffered + PYTHONUNBUFFERED: 1 + +concurrency: + group: cherry-pick +on: # yamllint disable-line rule:truthy + schedule: + - cron: '0 */18 * *' +jobs: + # don't use dockerhub push because this image updates so rarely + WoboqCodebrowser: + runs-on: [self-hosted, style-checker] + steps: + - name: Set envs + run: | + cat >> "$GITHUB_ENV" << 'EOF' + TEMP_PATH=${{runner.temp}}/codebrowser + REPO_COPY=${{runner.temp}}/codebrowser/ClickHouse + IMAGES_PATH=${{runner.temp}}/images_path + EOF + - name: Clear repository + run: | + sudo rm -fr $GITHUB_WORKSPACE && mkdir $GITHUB_WORKSPACE + - name: Check out repository code + uses: actions/checkout@v2 + with: + submodules: 'true' + - name: Codebrowser + run: | + sudo rm -fr $TEMP_PATH + mkdir -p $TEMP_PATH + cp -r $GITHUB_WORKSPACE $TEMP_PATH + cd $REPO_COPY/tests/ci && python3 codebrowser_check.py + - name: Cleanup + if: always() + run: | + docker kill $(docker ps -q) ||: + docker rm -f $(docker ps -a -q) ||: + sudo rm -fr $TEMP_PATH From 323e1a50874d3ac5d0d70d134f37c7cfaeb79cf5 Mon Sep 17 00:00:00 2001 From: alesapin Date: Tue, 21 Dec 2021 14:23:17 +0300 Subject: [PATCH 19/28] Add woboq workflow --- .github/workflows/woboq.yml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.github/workflows/woboq.yml b/.github/workflows/woboq.yml index a513c2555ec..b238e628f07 100644 --- a/.github/workflows/woboq.yml +++ b/.github/workflows/woboq.yml @@ -4,7 +4,7 @@ env: PYTHONUNBUFFERED: 1 concurrency: - group: cherry-pick + group: woboq on: # yamllint disable-line rule:truthy schedule: - cron: '0 */18 * *' From 84811a55c59f26ded65a11717e3ed33cd115f8bd Mon Sep 17 00:00:00 2001 From: alesapin Date: Tue, 21 Dec 2021 18:17:53 +0300 Subject: [PATCH 20/28] Fix path --- tests/ci/codebrowser_check.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/ci/codebrowser_check.py b/tests/ci/codebrowser_check.py index 1f82fc2c1c7..eeb06d2c684 100644 --- a/tests/ci/codebrowser_check.py +++ b/tests/ci/codebrowser_check.py @@ -67,7 +67,7 @@ if __name__ == "__main__": s3_path_prefix = "codebrowser" html_urls = s3_helper.fast_parallel_upload_dir(report_path, s3_path_prefix, 'clickhouse-test-reports') - index_html = 'HTML report' + index_html = 'HTML report' test_results = [(index_html, "Look at the report")] From eba9c15b3128b9d5de2675adda94fbe0b3149418 Mon Sep 17 00:00:00 2001 From: Denis Glazachev Date: Tue, 21 Dec 2021 19:36:44 +0400 Subject: [PATCH 21/28] Formatting fixes --- docs/en/operations/external-authenticators/kerberos.md | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/docs/en/operations/external-authenticators/kerberos.md b/docs/en/operations/external-authenticators/kerberos.md index 2e2a88dc7a8..da84c1f6a89 100644 --- a/docs/en/operations/external-authenticators/kerberos.md +++ b/docs/en/operations/external-authenticators/kerberos.md @@ -14,11 +14,11 @@ To enable Kerberos, one should include `kerberos` section in `config.xml`. This #### Parameters: - `principal` - canonical service principal name that will be acquired and used when accepting security contexts. - - This parameter is optional, if omitted, the default principal will be used. + - This parameter is optional, if omitted, the default principal will be used. - `realm` - a realm, that will be used to restrict authentication to only those requests whose initiator's realm matches it. - - This parameter is optional, if omitted, no additional filtering by realm will be applied. + - This parameter is optional, if omitted, no additional filtering by realm will be applied. Example (goes into `config.xml`): @@ -75,7 +75,7 @@ In order to enable Kerberos authentication for the user, specify `kerberos` sect Parameters: - `realm` - a realm that will be used to restrict authentication to only those requests whose initiator's realm matches it. - - This parameter is optional, if omitted, no additional filtering by realm will be applied. + - This parameter is optional, if omitted, no additional filtering by realm will be applied. Example (goes into `users.xml`): From de70a57700f83ca2c8df1d579327ae95aa9f4224 Mon Sep 17 00:00:00 2001 From: kssenii Date: Tue, 21 Dec 2021 21:46:19 +0300 Subject: [PATCH 22/28] Better --- src/Interpreters/ExpressionAnalyzer.cpp | 95 +++++++++++-------------- 1 file changed, 42 insertions(+), 53 deletions(-) diff --git a/src/Interpreters/ExpressionAnalyzer.cpp b/src/Interpreters/ExpressionAnalyzer.cpp index 9e7645b51f8..c2d414d7c33 100644 --- a/src/Interpreters/ExpressionAnalyzer.cpp +++ b/src/Interpreters/ExpressionAnalyzer.cpp @@ -115,71 +115,60 @@ bool checkPositionalArguments(ASTPtr & argument, const ASTSelectQuery * select_q } } - /// In case of expression/function (order by 1+2 and 2*x1, greatest(1, 2)) replace - /// positions only if all literals are numbers, otherwise it is not positional. + const auto * ast_literal = typeid_cast(argument.get()); + if (!ast_literal) + return false; - /// Case when GROUP BY element is position. - if (const auto * ast_literal = typeid_cast(argument.get())) + auto which = ast_literal->value.getType(); + if (which != Field::Types::UInt64) + return false; + + auto pos = ast_literal->value.get(); + if (!pos || pos > columns.size()) + throw Exception(ErrorCodes::ILLEGAL_TYPE_OF_ARGUMENT, + "Positional argument out of bounds: {} (exprected in range [1, {}]", + pos, columns.size()); + + const auto & column = columns[--pos]; + if (typeid_cast(column.get())) { - auto which = ast_literal->value.getType(); - if (which == Field::Types::UInt64) + argument = column->clone(); + } + else if (typeid_cast(column.get())) + { + std::function throw_if_aggregate_function = [&](ASTPtr node) { - auto pos = ast_literal->value.get(); - if (pos > 0 && pos <= columns.size()) + if (const auto * function = typeid_cast(node.get())) { - const auto & column = columns[--pos]; - if (typeid_cast(column.get())) + auto is_aggregate_function = AggregateFunctionFactory::instance().isAggregateFunctionName(function->name); + if (is_aggregate_function) { - argument = column->clone(); - } - else if (typeid_cast(column.get())) - { - std::function throw_if_aggregate_function = [&](ASTPtr node) - { - if (const auto * function = typeid_cast(node.get())) - { - auto is_aggregate_function = AggregateFunctionFactory::instance().isAggregateFunctionName(function->name); - if (is_aggregate_function) - { - throw Exception(ErrorCodes::ILLEGAL_TYPE_OF_ARGUMENT, - "Illegal value (aggregate function) for positional argument in {}", - ASTSelectQuery::expressionToString(expression)); - } - else - { - if (function->arguments) - { - for (const auto & arg : function->arguments->children) - throw_if_aggregate_function(arg); - } - } - } - }; - - if (expression == ASTSelectQuery::Expression::GROUP_BY) - throw_if_aggregate_function(column); - - argument = column->clone(); + throw Exception(ErrorCodes::ILLEGAL_TYPE_OF_ARGUMENT, + "Illegal value (aggregate function) for positional argument in {}", + ASTSelectQuery::expressionToString(expression)); } else { - throw Exception(ErrorCodes::ILLEGAL_TYPE_OF_ARGUMENT, - "Illegal value for positional argument in {}", - ASTSelectQuery::expressionToString(expression)); + if (function->arguments) + { + for (const auto & arg : function->arguments->children) + throw_if_aggregate_function(arg); + } } } - else if (pos > columns.size() || !pos) - { - throw Exception(ErrorCodes::ILLEGAL_TYPE_OF_ARGUMENT, - "Positional argument out of bounds: {} (exprected in range [1, {}]", - pos, columns.size()); - } - } - else - return false; + }; + + if (expression == ASTSelectQuery::Expression::GROUP_BY) + throw_if_aggregate_function(column); + + argument = column->clone(); } else - return false; + { + throw Exception(ErrorCodes::ILLEGAL_TYPE_OF_ARGUMENT, + "Illegal value for positional argument in {}", + ASTSelectQuery::expressionToString(expression)); + } return true; } From 0c0f95780f7cb6846386a70dc55e813045929cac Mon Sep 17 00:00:00 2001 From: alesapin Date: Tue, 21 Dec 2021 23:04:29 +0300 Subject: [PATCH 23/28] Remove woboq --- .github/workflows/main.yml | 35 ----------------------------------- 1 file changed, 35 deletions(-) diff --git a/.github/workflows/main.yml b/.github/workflows/main.yml index d08009b96f9..4cda4eac33e 100644 --- a/.github/workflows/main.yml +++ b/.github/workflows/main.yml @@ -49,41 +49,6 @@ jobs: with: name: changed_images path: ${{ runner.temp }}/docker_images_check/changed_images.json - WoboqCodebrowser: - needs: DockerHubPush - runs-on: [self-hosted, style-checker] - steps: - - name: Set envs - run: | - cat >> "$GITHUB_ENV" << 'EOF' - TEMP_PATH=${{runner.temp}}/codebrowser - REPO_COPY=${{runner.temp}}/codebrowser/ClickHouse - IMAGES_PATH=${{runner.temp}}/images_path - EOF - - name: Clear repository - run: | - sudo rm -fr $GITHUB_WORKSPACE && mkdir $GITHUB_WORKSPACE - - name: Check out repository code - uses: actions/checkout@v2 - with: - submodules: 'true' - - name: Download changed images - uses: actions/download-artifact@v2 - with: - name: changed_images - path: ${{ env.IMAGES_PATH }} - - name: Codebrowser - run: | - sudo rm -fr $TEMP_PATH - mkdir -p $TEMP_PATH - cp -r $GITHUB_WORKSPACE $TEMP_PATH - cd $REPO_COPY/tests/ci && python3 codebrowser_check.py - - name: Cleanup - if: always() - run: | - docker kill $(docker ps -q) ||: - docker rm -f $(docker ps -a -q) ||: - sudo rm -fr $TEMP_PATH StyleCheck: needs: DockerHubPush runs-on: [self-hosted, style-checker] From aeb0e76dbdc184ba2cf3b525f586ff721ea00897 Mon Sep 17 00:00:00 2001 From: alesapin Date: Wed, 22 Dec 2021 10:19:53 +0300 Subject: [PATCH 24/28] Fix cron expression --- .github/workflows/woboq.yml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.github/workflows/woboq.yml b/.github/workflows/woboq.yml index b238e628f07..1f90fa55b3a 100644 --- a/.github/workflows/woboq.yml +++ b/.github/workflows/woboq.yml @@ -7,7 +7,7 @@ concurrency: group: woboq on: # yamllint disable-line rule:truthy schedule: - - cron: '0 */18 * *' + - cron: '0 */18 * * *' jobs: # don't use dockerhub push because this image updates so rarely WoboqCodebrowser: From 7a00bc96ed42a0a17759d66e1539c9f933af3aa7 Mon Sep 17 00:00:00 2001 From: alesapin Date: Wed, 22 Dec 2021 10:54:50 +0300 Subject: [PATCH 25/28] Make great again --- .../Dockerfile | 0 .../app.py | 36 +++++++++++++------ .../requirements.txt | 0 tests/ci/pr_info.py | 15 ++++++-- tests/ci/run_check.py | 2 +- 5 files changed, 39 insertions(+), 14 deletions(-) rename tests/ci/{cancel_workflow_lambda => cancel_and_rerun_workflow_lambda}/Dockerfile (100%) rename tests/ci/{cancel_workflow_lambda => cancel_and_rerun_workflow_lambda}/app.py (70%) rename tests/ci/{cancel_workflow_lambda => cancel_and_rerun_workflow_lambda}/requirements.txt (100%) diff --git a/tests/ci/cancel_workflow_lambda/Dockerfile b/tests/ci/cancel_and_rerun_workflow_lambda/Dockerfile similarity index 100% rename from tests/ci/cancel_workflow_lambda/Dockerfile rename to tests/ci/cancel_and_rerun_workflow_lambda/Dockerfile diff --git a/tests/ci/cancel_workflow_lambda/app.py b/tests/ci/cancel_and_rerun_workflow_lambda/app.py similarity index 70% rename from tests/ci/cancel_workflow_lambda/app.py rename to tests/ci/cancel_and_rerun_workflow_lambda/app.py index e475fcb931a..1f952b97daa 100644 --- a/tests/ci/cancel_workflow_lambda/app.py +++ b/tests/ci/cancel_and_rerun_workflow_lambda/app.py @@ -70,19 +70,19 @@ def _exec_get_with_retry(url): raise Exception("Cannot execute GET request with retries") -def get_workflows_cancel_urls_for_pull_request(pull_request_event): +def get_workflows_urls_for_pull_request(pull_request_event, url_name, check_status): head_branch = pull_request_event['head']['ref'] print("PR", pull_request_event['number'], "has head ref", head_branch) workflows = _exec_get_with_retry(API_URL + f"/actions/runs?branch={head_branch}") - workflows_urls_to_cancel = set([]) + workflows_urls = set([]) for workflow in workflows['workflow_runs']: - if workflow['status'] != 'completed': - print("Workflow", workflow['url'], "not finished, going to be cancelled") - workflows_urls_to_cancel.add(workflow['cancel_url']) + if check_status(workflow['status']): + print("Workflow", workflow['url'], "going to check workflow") + workflows_urls.add(workflow[url_name]) else: - print("Workflow", workflow['url'], "already finished, will not try to cancel") + print("Workflow", workflow['url'], "doesn't satisfy status condition") - return workflows_urls_to_cancel + return workflows_urls def _exec_post_with_retry(url, token): headers = { @@ -99,7 +99,7 @@ def _exec_post_with_retry(url, token): raise Exception("Cannot execute POST request with retry") -def cancel_workflows(urls_to_cancel, token): +def exec_workflow_url(urls_to_cancel, token): for url in urls_to_cancel: print("Cancelling workflow using url", url) _exec_post_with_retry(url, token) @@ -117,9 +117,25 @@ def main(event): print("PR has labels", labels) if action == 'closed' or 'do not test' in labels: print("PR merged/closed or manually labeled 'do not test' will kill workflows") - workflows_to_cancel = get_workflows_cancel_urls_for_pull_request(pull_request) + def check_status(status): + return status != 'completed' + workflows_to_cancel = get_workflows_urls_for_pull_request(pull_request, 'cancel_url', check_status) print(f"Found {len(workflows_to_cancel)} workflows to cancel") - cancel_workflows(workflows_to_cancel, token) + exec_workflow_url(workflows_to_cancel, token) + elif action == 'labeled' and 'can be tested' in labels: + print("PR marked with can be tested label, rerun workflow") + + def check_status_for_cancell(status): + return status != 'completed' + workflows_to_cancel = get_workflows_urls_for_pull_request(pull_request, 'cancel_url', check_status_for_cancell) + print("Cancelling all previous workflows") + print(f"Found {len(workflows_to_cancel)} workflows to cancel") + exec_workflow_url(workflows_to_cancel, token) + def check_status_for_rerun(status): + return status in ('completed', 'cancelled') + workflows_to_rerun = get_workflows_urls_for_pull_request(pull_request, 'rerun_url', check_status_for_rerun) + print(f"Found {len(workflows_to_rerun)} workflows") + exec_workflow_url(workflows_to_rerun, token) else: print("Nothing to do") diff --git a/tests/ci/cancel_workflow_lambda/requirements.txt b/tests/ci/cancel_and_rerun_workflow_lambda/requirements.txt similarity index 100% rename from tests/ci/cancel_workflow_lambda/requirements.txt rename to tests/ci/cancel_and_rerun_workflow_lambda/requirements.txt diff --git a/tests/ci/pr_info.py b/tests/ci/pr_info.py index b1775f0fc6c..7a3e24a52d4 100644 --- a/tests/ci/pr_info.py +++ b/tests/ci/pr_info.py @@ -33,7 +33,7 @@ def get_pr_for_commit(sha, ref): class PRInfo: - def __init__(self, github_event=None, need_orgs=False, need_changed_files=False): + def __init__(self, github_event=None, need_orgs=False, need_changed_files=False, labels_from_api=False): if not github_event: if GITHUB_EVENT_PATH: with open(GITHUB_EVENT_PATH, 'r', encoding='utf-8') as event_file: @@ -61,7 +61,12 @@ class PRInfo: self.head_ref = github_event['pull_request']['head']['ref'] self.head_name = github_event['pull_request']['head']['repo']['full_name'] - self.labels = {l['name'] for l in github_event['pull_request']['labels']} + if labels_from_api: + self.labels = {l['name'] for l in github_event['pull_request']['labels']} + else: + response = requests.get("https://api.github.com/repos/{GITHUB_REPOSITORY}/issues/{self.number}/labels") + self.labels = {l['name'] for l in response.json()} + self.user_login = github_event['pull_request']['user']['login'] self.user_orgs = set([]) if need_orgs: @@ -90,7 +95,11 @@ class PRInfo: f"https://api.github.com/repos/{GITHUB_REPOSITORY}/compare/{github_event['before']}...{self.sha}" else: self.number = pull_request['number'] - self.labels = {l['name'] for l in pull_request['labels']} + if labels_from_api: + self.labels = {l['name'] for l in pull_request['labels']} + else: + response = requests.get("https://api.github.com/repos/{GITHUB_REPOSITORY}/issues/{self.number}/labels") + self.labels = {l['name'] for l in response.json()} self.base_ref = pull_request['base']['ref'] self.base_name = pull_request['base']['repo']['full_name'] self.head_ref = pull_request['head']['ref'] diff --git a/tests/ci/run_check.py b/tests/ci/run_check.py index 5fa6a228e46..d0acaa20161 100644 --- a/tests/ci/run_check.py +++ b/tests/ci/run_check.py @@ -109,7 +109,7 @@ def should_run_checks_for_pr(pr_info): if __name__ == "__main__": logging.basicConfig(level=logging.INFO) - pr_info = PRInfo(need_orgs=True) + pr_info = PRInfo(need_orgs=True, labels_from_api=True) can_run, description = should_run_checks_for_pr(pr_info) gh = Github(get_best_robot_token()) commit = get_commit(gh, pr_info.sha) From b300de6a4a0ad23f15c6dcf792fcfbd64a7d42af Mon Sep 17 00:00:00 2001 From: alesapin Date: Wed, 22 Dec 2021 11:13:04 +0300 Subject: [PATCH 26/28] Better --- .github/workflows/main.yml | 4 ++-- tests/ci/cancel_and_rerun_workflow_lambda/app.py | 11 +++++++++-- tests/ci/pr_info.py | 9 +++++---- tests/ci/run_check.py | 1 + 4 files changed, 17 insertions(+), 8 deletions(-) diff --git a/.github/workflows/main.yml b/.github/workflows/main.yml index 4cda4eac33e..78f559c8eb8 100644 --- a/.github/workflows/main.yml +++ b/.github/workflows/main.yml @@ -920,7 +920,7 @@ jobs: - BuilderDebMsan - BuilderDebDebug runs-on: [self-hosted, style-checker] - if: always() + if: ${{ success() || failure() }} steps: - name: Set envs run: | @@ -960,7 +960,7 @@ jobs: - BuilderBinDarwinAarch64 - BuilderBinPPC64 runs-on: [self-hosted, style-checker] - if: always() + if: ${{ success() || failure() }} steps: - name: Set envs run: | diff --git a/tests/ci/cancel_and_rerun_workflow_lambda/app.py b/tests/ci/cancel_and_rerun_workflow_lambda/app.py index 1f952b97daa..28aedcd617f 100644 --- a/tests/ci/cancel_and_rerun_workflow_lambda/app.py +++ b/tests/ci/cancel_and_rerun_workflow_lambda/app.py @@ -131,11 +131,18 @@ def main(event): print("Cancelling all previous workflows") print(f"Found {len(workflows_to_cancel)} workflows to cancel") exec_workflow_url(workflows_to_cancel, token) + def check_status_for_rerun(status): return status in ('completed', 'cancelled') workflows_to_rerun = get_workflows_urls_for_pull_request(pull_request, 'rerun_url', check_status_for_rerun) - print(f"Found {len(workflows_to_rerun)} workflows") - exec_workflow_url(workflows_to_rerun, token) + for _ in range(10): + print(f"Found {len(workflows_to_rerun)} workflows") + if len(workflows_to_rerun) > 0: + exec_workflow_url(workflows_to_rerun, token) + break + print("No workflows found, wait until cancelled") + time.sleep(3) + workflows_to_rerun = get_workflows_urls_for_pull_request(pull_request, 'rerun_url', check_status_for_rerun) else: print("Nothing to do") diff --git a/tests/ci/pr_info.py b/tests/ci/pr_info.py index 7a3e24a52d4..3a9f8a44943 100644 --- a/tests/ci/pr_info.py +++ b/tests/ci/pr_info.py @@ -62,10 +62,10 @@ class PRInfo: self.head_name = github_event['pull_request']['head']['repo']['full_name'] if labels_from_api: - self.labels = {l['name'] for l in github_event['pull_request']['labels']} - else: response = requests.get("https://api.github.com/repos/{GITHUB_REPOSITORY}/issues/{self.number}/labels") self.labels = {l['name'] for l in response.json()} + else: + self.labels = {l['name'] for l in github_event['pull_request']['labels']} self.user_login = github_event['pull_request']['user']['login'] self.user_orgs = set([]) @@ -96,10 +96,11 @@ class PRInfo: else: self.number = pull_request['number'] if labels_from_api: - self.labels = {l['name'] for l in pull_request['labels']} - else: response = requests.get("https://api.github.com/repos/{GITHUB_REPOSITORY}/issues/{self.number}/labels") self.labels = {l['name'] for l in response.json()} + else: + self.labels = {l['name'] for l in pull_request['labels']} + self.base_ref = pull_request['base']['ref'] self.base_name = pull_request['base']['repo']['full_name'] self.head_ref = pull_request['head']['ref'] diff --git a/tests/ci/run_check.py b/tests/ci/run_check.py index d0acaa20161..692cda18f20 100644 --- a/tests/ci/run_check.py +++ b/tests/ci/run_check.py @@ -90,6 +90,7 @@ def pr_is_by_trusted_user(pr_user_login, pr_user_orgs): # can be skipped entirely. def should_run_checks_for_pr(pr_info): # Consider the labels and whether the user is trusted. + print("Got labels", pr_info.labels) force_labels = set(['force tests']).intersection(pr_info.labels) if force_labels: return True, "Labeled '{}'".format(', '.join(force_labels)) From 20e287f5b6b0f29d2ef24d01aa5be3cb7c327218 Mon Sep 17 00:00:00 2001 From: alesapin Date: Wed, 22 Dec 2021 11:15:31 +0300 Subject: [PATCH 27/28] Followup --- tests/ci/pr_info.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/tests/ci/pr_info.py b/tests/ci/pr_info.py index 3a9f8a44943..795fe9aaad3 100644 --- a/tests/ci/pr_info.py +++ b/tests/ci/pr_info.py @@ -62,7 +62,7 @@ class PRInfo: self.head_name = github_event['pull_request']['head']['repo']['full_name'] if labels_from_api: - response = requests.get("https://api.github.com/repos/{GITHUB_REPOSITORY}/issues/{self.number}/labels") + response = requests.get(f"https://api.github.com/repos/{GITHUB_REPOSITORY}/issues/{self.number}/labels") self.labels = {l['name'] for l in response.json()} else: self.labels = {l['name'] for l in github_event['pull_request']['labels']} @@ -96,7 +96,7 @@ class PRInfo: else: self.number = pull_request['number'] if labels_from_api: - response = requests.get("https://api.github.com/repos/{GITHUB_REPOSITORY}/issues/{self.number}/labels") + response = requests.get(f"https://api.github.com/repos/{GITHUB_REPOSITORY}/issues/{self.number}/labels") self.labels = {l['name'] for l in response.json()} else: self.labels = {l['name'] for l in pull_request['labels']} From e31be8f056a85eb3a63a9d5c1ff863af64163c0a Mon Sep 17 00:00:00 2001 From: alesapin Date: Wed, 22 Dec 2021 12:25:16 +0300 Subject: [PATCH 28/28] Better check in lambda --- .../cancel_and_rerun_workflow_lambda/app.py | 91 ++++++++++++------- 1 file changed, 60 insertions(+), 31 deletions(-) diff --git a/tests/ci/cancel_and_rerun_workflow_lambda/app.py b/tests/ci/cancel_and_rerun_workflow_lambda/app.py index 28aedcd617f..bd1dc394086 100644 --- a/tests/ci/cancel_and_rerun_workflow_lambda/app.py +++ b/tests/ci/cancel_and_rerun_workflow_lambda/app.py @@ -1,12 +1,21 @@ #!/usr/bin/env python3 +from collections import namedtuple import json import time -import jwt +import jwt import requests import boto3 +NEED_RERUN_OR_CANCELL_WORKFLOWS = { + 13241696, # PR + 15834118, # Docs + 15522500, # MasterCI + 15516108, # ReleaseCI + 15797242, # BackportPR +} + # https://docs.github.com/en/rest/reference/actions#cancel-a-workflow-run # API_URL = 'https://api.github.com/repos/ClickHouse/ClickHouse' @@ -70,19 +79,32 @@ def _exec_get_with_retry(url): raise Exception("Cannot execute GET request with retries") -def get_workflows_urls_for_pull_request(pull_request_event, url_name, check_status): +WorkflowDescription = namedtuple('WorkflowDescription', + ['run_id', 'status', 'rerun_url', 'cancel_url']) + + +def get_workflows_description_for_pull_request(pull_request_event): head_branch = pull_request_event['head']['ref'] print("PR", pull_request_event['number'], "has head ref", head_branch) workflows = _exec_get_with_retry(API_URL + f"/actions/runs?branch={head_branch}") - workflows_urls = set([]) + workflow_descriptions = [] for workflow in workflows['workflow_runs']: - if check_status(workflow['status']): - print("Workflow", workflow['url'], "going to check workflow") - workflows_urls.add(workflow[url_name]) - else: - print("Workflow", workflow['url'], "doesn't satisfy status condition") + if workflow['workflow_id'] in NEED_RERUN_OR_CANCELL_WORKFLOWS: + workflow_descriptions.append(WorkflowDescription( + run_id=workflow['id'], + status=workflow['status'], + rerun_url=workflow['rerun_url'], + cancel_url=workflow['cancel_url'])) - return workflows_urls + return workflow_descriptions + +def get_workflow_description(workflow_id): + workflow = _exec_get_with_retry(API_URL + f"/actions/runs/{workflow_id}") + return WorkflowDescription( + run_id=workflow['id'], + status=workflow['status'], + rerun_url=workflow['rerun_url'], + cancel_url=workflow['cancel_url']) def _exec_post_with_retry(url, token): headers = { @@ -101,9 +123,9 @@ def _exec_post_with_retry(url, token): def exec_workflow_url(urls_to_cancel, token): for url in urls_to_cancel: - print("Cancelling workflow using url", url) + print("Post for workflow workflow using url", url) _exec_post_with_retry(url, token) - print("Workflow cancelled") + print("Workflow post finished") def main(event): token = get_token_from_aws() @@ -117,32 +139,39 @@ def main(event): print("PR has labels", labels) if action == 'closed' or 'do not test' in labels: print("PR merged/closed or manually labeled 'do not test' will kill workflows") - def check_status(status): - return status != 'completed' - workflows_to_cancel = get_workflows_urls_for_pull_request(pull_request, 'cancel_url', check_status) - print(f"Found {len(workflows_to_cancel)} workflows to cancel") - exec_workflow_url(workflows_to_cancel, token) + workflow_descriptions = get_workflows_description_for_pull_request(pull_request) + urls_to_cancel = [] + for workflow_description in workflow_descriptions: + if workflow_description.status != 'completed': + urls_to_cancel.append(workflow_description.cancel_url) + print(f"Found {len(urls_to_cancel)} workflows to cancel") + exec_workflow_url(urls_to_cancel, token) elif action == 'labeled' and 'can be tested' in labels: print("PR marked with can be tested label, rerun workflow") + workflow_descriptions = get_workflows_description_for_pull_request(pull_request) + if not workflow_descriptions: + print("Not found any workflows") + return - def check_status_for_cancell(status): - return status != 'completed' - workflows_to_cancel = get_workflows_urls_for_pull_request(pull_request, 'cancel_url', check_status_for_cancell) - print("Cancelling all previous workflows") - print(f"Found {len(workflows_to_cancel)} workflows to cancel") - exec_workflow_url(workflows_to_cancel, token) + sorted_workflows = list(sorted(workflow_descriptions, key=lambda x: x.run_id)) + most_recent_workflow = sorted_workflows[-1] + print("Latest workflow", most_recent_workflow) + if most_recent_workflow.status != 'completed': + print("Latest workflow is not completed, cancelling") + exec_workflow_url([most_recent_workflow.cancel_url], token) + print("Cancelled") - def check_status_for_rerun(status): - return status in ('completed', 'cancelled') - workflows_to_rerun = get_workflows_urls_for_pull_request(pull_request, 'rerun_url', check_status_for_rerun) - for _ in range(10): - print(f"Found {len(workflows_to_rerun)} workflows") - if len(workflows_to_rerun) > 0: - exec_workflow_url(workflows_to_rerun, token) + for _ in range(30): + latest_workflow_desc = get_workflow_description(most_recent_workflow.run_id) + print("Checking latest workflow", latest_workflow_desc) + if latest_workflow_desc.status in ('completed', 'cancelled'): + print("Finally latest workflow done, going to rerun") + exec_workflow_url([most_recent_workflow.rerun_url], token) + print("Rerun finished, exiting") break - print("No workflows found, wait until cancelled") + print("Still have strange status") time.sleep(3) - workflows_to_rerun = get_workflows_urls_for_pull_request(pull_request, 'rerun_url', check_status_for_rerun) + else: print("Nothing to do")