From efb09d7ff5068f172f5910103a4f51a232caaca3 Mon Sep 17 00:00:00 2001 From: Alexander Kuzmenkov Date: Fri, 31 Jul 2020 17:53:41 +0300 Subject: [PATCH] Cleanup for #12999. NFC. --- base/daemon/BaseDaemon.cpp | 20 ++++++++++++++---- src/Common/MemorySanitizer.h | 2 ++ src/IO/HashingReadBuffer.h | 3 ++- src/IO/PeekableReadBuffer.cpp | 11 +++++----- src/IO/ReadBuffer.h | 16 +++++++++++---- src/IO/ReadBufferAIO.cpp | 2 +- src/IO/ReadBufferFromFile.h | 2 -- src/IO/ReadBufferFromFileBase.h | 1 - src/IO/ReadBufferFromFileDescriptor.cpp | 27 ++++++++++++++++++------- src/IO/ReadBufferFromFileDescriptor.h | 8 +++----- src/IO/ReadBufferFromHDFS.h | 1 - src/IO/ReadHelpers.cpp | 7 ++++++- src/Interpreters/executeQuery.cpp | 2 +- src/Parsers/parseQuery.cpp | 6 ++++++ 14 files changed, 75 insertions(+), 33 deletions(-) diff --git a/base/daemon/BaseDaemon.cpp b/base/daemon/BaseDaemon.cpp index e9f85da3594..59af8f5c5f1 100644 --- a/base/daemon/BaseDaemon.cpp +++ b/base/daemon/BaseDaemon.cpp @@ -51,6 +51,7 @@ #include #include #include +#include #include #if !defined(ARCADIA_BUILD) @@ -76,6 +77,15 @@ static void call_default_signal_handler(int sig) raise(sig); } +const char * msan_strsignal(int sig) +{ + // Apparently strsignal is not instrumented by MemorySanitizer, so we + // have to unpoison it to avoid msan reports inside fmt library when we + // print it. + const char * signal_name = strsignal(sig); + __msan_unpoison_string(signal_name); + return signal_name; +} static constexpr size_t max_query_id_size = 127; @@ -275,12 +285,14 @@ private: if (query_id.empty()) { LOG_FATAL(log, "(version {}{}, {}) (from thread {}) (no query) Received signal {} ({})", - VERSION_STRING, VERSION_OFFICIAL, daemon.build_id_info, thread_num, strsignal(sig), sig); + VERSION_STRING, VERSION_OFFICIAL, daemon.build_id_info, + thread_num, msan_strsignal(sig), sig); } else { LOG_FATAL(log, "(version {}{}, {}) (from thread {}) (query_id: {}) Received signal {} ({})", - VERSION_STRING, VERSION_OFFICIAL, daemon.build_id_info, thread_num, query_id, strsignal(sig), sig); + VERSION_STRING, VERSION_OFFICIAL, daemon.build_id_info, + thread_num, query_id, msan_strsignal(sig), sig); } String error_message; @@ -831,13 +843,13 @@ void BaseDaemon::handleSignal(int signal_id) onInterruptSignals(signal_id); } else - throw DB::Exception(std::string("Unsupported signal: ") + strsignal(signal_id), 0); + throw DB::Exception(std::string("Unsupported signal: ") + msan_strsignal(signal_id), 0); } void BaseDaemon::onInterruptSignals(int signal_id) { is_cancelled = true; - LOG_INFO(&logger(), "Received termination signal ({})", strsignal(signal_id)); + LOG_INFO(&logger(), "Received termination signal ({})", msan_strsignal(signal_id)); if (sigint_signals_counter >= 2) { diff --git a/src/Common/MemorySanitizer.h b/src/Common/MemorySanitizer.h index 6ece85901a8..54a92ea3a19 100644 --- a/src/Common/MemorySanitizer.h +++ b/src/Common/MemorySanitizer.h @@ -8,11 +8,13 @@ #define __msan_unpoison(X, Y) #define __msan_test_shadow(X, Y) (false) #define __msan_print_shadow(X, Y) +#define __msan_unpoison_string(X) #if defined(__has_feature) # if __has_feature(memory_sanitizer) # undef __msan_unpoison # undef __msan_test_shadow # undef __msan_print_shadow +# undef __msan_unpoison_string # include # endif #endif diff --git a/src/IO/HashingReadBuffer.h b/src/IO/HashingReadBuffer.h index 9ea646206cd..9fcd6dc6b41 100644 --- a/src/IO/HashingReadBuffer.h +++ b/src/IO/HashingReadBuffer.h @@ -33,7 +33,8 @@ private: working_buffer = in.buffer(); pos = in.position(); - calculateHash(working_buffer.begin(), working_buffer.size()); + // `pos` may be different from working_buffer.begin() when using AIO. + calculateHash(pos, working_buffer.end() - pos); return res; } diff --git a/src/IO/PeekableReadBuffer.cpp b/src/IO/PeekableReadBuffer.cpp index dd969d07549..8ad0e7b572e 100644 --- a/src/IO/PeekableReadBuffer.cpp +++ b/src/IO/PeekableReadBuffer.cpp @@ -22,6 +22,8 @@ PeekableReadBuffer::PeekableReadBuffer(ReadBuffer & sub_buf_, size_t start_size_ void PeekableReadBuffer::reset() { + checkStateCorrect(); + peeked_size = 0; checkpoint = nullptr; checkpoint_in_own_memory = false; @@ -31,6 +33,8 @@ void PeekableReadBuffer::reset() Buffer & sub_working = sub_buf.buffer(); BufferBase::set(sub_working.begin(), sub_working.size(), sub_buf.offset()); + + checkStateCorrect(); } bool PeekableReadBuffer::peekNext() @@ -150,7 +154,7 @@ bool PeekableReadBuffer::nextImpl() /// Switch to reading from sub_buf (or just update it if already switched) Buffer & sub_working = sub_buf.buffer(); BufferBase::set(sub_working.begin(), sub_working.size(), sub_buf.offset()); - working_buffer_offset = sub_buf.offset(); + nextimpl_working_buffer_offset = sub_buf.offset(); checkStateCorrect(); return res; @@ -159,7 +163,6 @@ bool PeekableReadBuffer::nextImpl() void PeekableReadBuffer::checkStateCorrect() const { -#ifndef NDEBUG if (checkpoint) { if (checkpointInOwnMemory()) @@ -190,7 +193,6 @@ void PeekableReadBuffer::checkStateCorrect() const throw DB::Exception("Pos in empty own buffer", ErrorCodes::LOGICAL_ERROR); if (unread_limit < memory.size()) throw DB::Exception("Size limit exceed", ErrorCodes::LOGICAL_ERROR); -#endif } void PeekableReadBuffer::resizeOwnMemoryIfNecessary(size_t bytes_to_append) @@ -245,11 +247,10 @@ void PeekableReadBuffer::resizeOwnMemoryIfNecessary(size_t bytes_to_append) void PeekableReadBuffer::makeContinuousMemoryFromCheckpointToPos() { -#ifndef NDEBUG if (!checkpoint) throw DB::Exception("There is no checkpoint", ErrorCodes::LOGICAL_ERROR); checkStateCorrect(); -#endif + if (!checkpointInOwnMemory() || currentlyReadFromOwnMemory()) return; /// is't already continuous diff --git a/src/IO/ReadBuffer.h b/src/IO/ReadBuffer.h index 664e2567006..a35e5206e49 100644 --- a/src/IO/ReadBuffer.h +++ b/src/IO/ReadBuffer.h @@ -1,5 +1,6 @@ #pragma once +#include #include #include #include @@ -41,6 +42,11 @@ public: */ ReadBuffer(Position ptr, size_t size, size_t offset) : BufferBase(ptr, size, offset) {} + // Copying the read buffers can be dangerous because they can hold a lot of + // memory or open files, so better to disable the copy constructor to prevent + // accidental copying. + ReadBuffer(const ReadBuffer &) = delete; + // FIXME: behavior differs greately from `BufferBase::set()` and it's very confusing. void set(Position ptr, size_t size) { BufferBase::set(ptr, size, 0); working_buffer.resize(0); } @@ -54,8 +60,8 @@ public: if (!res) working_buffer.resize(0); - pos = working_buffer.begin() + working_buffer_offset; - working_buffer_offset = 0; + pos = working_buffer.begin() + nextimpl_working_buffer_offset; + nextimpl_working_buffer_offset = 0; return res; } @@ -169,8 +175,10 @@ public: } protected: - /// The number of bytes to ignore from the initial position of `working_buffer` buffer. - size_t working_buffer_offset = 0; + /// The number of bytes to ignore from the initial position of `working_buffer` + /// buffer. Apparently this is an additional out-parameter for nextImpl(), + /// not a real field. + size_t nextimpl_working_buffer_offset = 0; private: /** Read the next data and fill a buffer with it. diff --git a/src/IO/ReadBufferAIO.cpp b/src/IO/ReadBufferAIO.cpp index abf55021cfb..39e3c26826a 100644 --- a/src/IO/ReadBufferAIO.cpp +++ b/src/IO/ReadBufferAIO.cpp @@ -298,7 +298,7 @@ void ReadBufferAIO::finalize() first_unread_pos_in_file += bytes_read; total_bytes_read += bytes_read; - working_buffer_offset = region_left_padding; + nextimpl_working_buffer_offset = region_left_padding; if (total_bytes_read == max_bytes_read) is_eof = true; diff --git a/src/IO/ReadBufferFromFile.h b/src/IO/ReadBufferFromFile.h index 9218e0738ca..a621553daae 100644 --- a/src/IO/ReadBufferFromFile.h +++ b/src/IO/ReadBufferFromFile.h @@ -32,8 +32,6 @@ public: ReadBufferFromFile(int fd, const std::string & original_file_name = {}, size_t buf_size = DBMS_DEFAULT_BUFFER_SIZE, char * existing_memory = nullptr, size_t alignment = 0); - ReadBufferFromFile(ReadBufferFromFile &&) = default; - ~ReadBufferFromFile() override; /// Close file before destruction of object. diff --git a/src/IO/ReadBufferFromFileBase.h b/src/IO/ReadBufferFromFileBase.h index 767f0a3b2b0..f2a724e03a4 100644 --- a/src/IO/ReadBufferFromFileBase.h +++ b/src/IO/ReadBufferFromFileBase.h @@ -17,7 +17,6 @@ class ReadBufferFromFileBase : public BufferWithOwnMemory public: ReadBufferFromFileBase(); ReadBufferFromFileBase(size_t buf_size, char * existing_memory, size_t alignment); - ReadBufferFromFileBase(ReadBufferFromFileBase &&) = default; ~ReadBufferFromFileBase() override; virtual std::string getFileName() const = 0; diff --git a/src/IO/ReadBufferFromFileDescriptor.cpp b/src/IO/ReadBufferFromFileDescriptor.cpp index 3a567f6d7ca..0ab07b85027 100644 --- a/src/IO/ReadBufferFromFileDescriptor.cpp +++ b/src/IO/ReadBufferFromFileDescriptor.cpp @@ -85,7 +85,7 @@ bool ReadBufferFromFileDescriptor::nextImpl() } } - pos_in_file += bytes_read; + file_offset_of_buffer_end += bytes_read; if (bytes_read) { @@ -102,22 +102,35 @@ bool ReadBufferFromFileDescriptor::nextImpl() /// If 'offset' is small enough to stay in buffer after seek, then true seek in file does not happen. off_t ReadBufferFromFileDescriptor::seek(off_t offset, int whence) { - off_t new_pos; + size_t new_pos; if (whence == SEEK_SET) + { + assert(offset >= 0); new_pos = offset; + } else if (whence == SEEK_CUR) - new_pos = pos_in_file - (working_buffer.end() - pos) + offset; + { + new_pos = file_offset_of_buffer_end - (working_buffer.end() - pos) + offset; + } else + { throw Exception("ReadBufferFromFileDescriptor::seek expects SEEK_SET or SEEK_CUR as whence", ErrorCodes::ARGUMENT_OUT_OF_BOUND); + } /// Position is unchanged. - if (new_pos + (working_buffer.end() - pos) == pos_in_file) + if (new_pos + (working_buffer.end() - pos) == file_offset_of_buffer_end) return new_pos; - if (hasPendingData() && new_pos <= pos_in_file && new_pos >= pos_in_file - static_cast(working_buffer.size())) + // file_offset_of_buffer_end corresponds to working_buffer.end(); it's a past-the-end pos, + // so the second inequality is strict. + if (file_offset_of_buffer_end - working_buffer.size() <= static_cast(new_pos) + && new_pos < file_offset_of_buffer_end) { /// Position is still inside buffer. - pos = working_buffer.begin() + (new_pos - (pos_in_file - working_buffer.size())); + pos = working_buffer.end() - file_offset_of_buffer_end + new_pos; + assert(pos >= working_buffer.begin()); + assert(pos < working_buffer.end()); + return new_pos; } else @@ -130,7 +143,7 @@ off_t ReadBufferFromFileDescriptor::seek(off_t offset, int whence) if (-1 == res) throwFromErrnoWithPath("Cannot seek through file " + getFileName(), getFileName(), ErrorCodes::CANNOT_SEEK_THROUGH_FILE); - pos_in_file = new_pos; + file_offset_of_buffer_end = new_pos; watch.stop(); ProfileEvents::increment(ProfileEvents::DiskReadElapsedMicroseconds, watch.elapsedMicroseconds()); diff --git a/src/IO/ReadBufferFromFileDescriptor.h b/src/IO/ReadBufferFromFileDescriptor.h index bb170d472f1..0779b215067 100644 --- a/src/IO/ReadBufferFromFileDescriptor.h +++ b/src/IO/ReadBufferFromFileDescriptor.h @@ -14,7 +14,7 @@ class ReadBufferFromFileDescriptor : public ReadBufferFromFileBase { protected: int fd; - off_t pos_in_file; /// What offset in file corresponds to working_buffer.end(). + size_t file_offset_of_buffer_end; /// What offset in file corresponds to working_buffer.end(). bool nextImpl() override; @@ -23,9 +23,7 @@ protected: public: ReadBufferFromFileDescriptor(int fd_, size_t buf_size = DBMS_DEFAULT_BUFFER_SIZE, char * existing_memory = nullptr, size_t alignment = 0) - : ReadBufferFromFileBase(buf_size, existing_memory, alignment), fd(fd_), pos_in_file(0) {} - - ReadBufferFromFileDescriptor(ReadBufferFromFileDescriptor &&) = default; + : ReadBufferFromFileBase(buf_size, existing_memory, alignment), fd(fd_), file_offset_of_buffer_end(0) {} int getFD() const { @@ -34,7 +32,7 @@ public: off_t getPosition() override { - return pos_in_file - (working_buffer.end() - pos); + return file_offset_of_buffer_end - (working_buffer.end() - pos); } /// If 'offset' is small enough to stay in buffer after seek, then true seek in file does not happen. diff --git a/src/IO/ReadBufferFromHDFS.h b/src/IO/ReadBufferFromHDFS.h index e27159f2e98..58479d45660 100644 --- a/src/IO/ReadBufferFromHDFS.h +++ b/src/IO/ReadBufferFromHDFS.h @@ -19,7 +19,6 @@ class ReadBufferFromHDFS : public BufferWithOwnMemory std::unique_ptr impl; public: ReadBufferFromHDFS(const std::string & hdfs_name_, size_t buf_size = DBMS_DEFAULT_BUFFER_SIZE); - ReadBufferFromHDFS(ReadBufferFromHDFS &&) = default; ~ReadBufferFromHDFS() override; bool nextImpl() override; diff --git a/src/IO/ReadHelpers.cpp b/src/IO/ReadHelpers.cpp index d4574732c58..900e9c7b535 100644 --- a/src/IO/ReadHelpers.cpp +++ b/src/IO/ReadHelpers.cpp @@ -1100,9 +1100,14 @@ bool loadAtPosition(ReadBuffer & in, DB::Memory<> & memory, char * & current) return true; saveUpToPosition(in, memory, current); + bool loaded_more = !in.eof(); - assert(in.position() == in.buffer().begin()); + // A sanity check. Buffer position may be in the beginning of the buffer + // (normal case), or have some offset from it (AIO). + assert(in.position() >= in.buffer().begin()); + assert(in.position() <= in.buffer().end()); current = in.position(); + return loaded_more; } diff --git a/src/Interpreters/executeQuery.cpp b/src/Interpreters/executeQuery.cpp index 87b5da991a9..46237bd6ef4 100644 --- a/src/Interpreters/executeQuery.cpp +++ b/src/Interpreters/executeQuery.cpp @@ -697,7 +697,7 @@ void executeQuery( const char * end; /// If 'istr' is empty now, fetch next data into buffer. - if (istr.buffer().size() == 0) + if (!istr.hasPendingData()) istr.next(); size_t max_query_size = context.getSettingsRef().max_query_size; diff --git a/src/Parsers/parseQuery.cpp b/src/Parsers/parseQuery.cpp index 7982b46d8b8..f4e4c195506 100644 --- a/src/Parsers/parseQuery.cpp +++ b/src/Parsers/parseQuery.cpp @@ -135,7 +135,13 @@ void writeCommonErrorMessage( out << ": failed at position " << (last_token.begin - begin + 1); if (last_token.type == TokenType::EndOfStream || last_token.type == TokenType::Semicolon) + { out << " (end of query)"; + } + else + { + out << " ('" << std::string(last_token.begin, last_token.end - last_token.begin) << "')"; + } /// If query is multiline. const char * nl = find_first_symbols<'\n'>(begin, end);