#pragma once #include #include #include #include namespace DB { /// Информация для профайлинга. struct BlockStreamProfileInfo { bool started; Poco::Stopwatch work_stopwatch; /// Время вычислений (выполнения функции read()) Poco::Stopwatch total_stopwatch; /// Время с учётом ожидания size_t rows; size_t blocks; size_t bytes; /// Информация о вложенных потоках - для выделения чистого времени работы. typedef std::vector BlockStreamProfileInfos; BlockStreamProfileInfos nested_infos; String column_names; BlockStreamProfileInfo() : started(false), rows(0), blocks(0), bytes(0) {} void update(Block & block); void print(std::ostream & ostr) const; }; /** Смотрит за тем, как работает источник блоков. * Позволяет получить информацию для профайлинга: * строк в секунду, блоков в секунду, мегабайт в секунду и т. п. * Позволяет остановить чтение данных (во вложенных источниках). */ class IProfilingBlockInputStream : public IBlockInputStream { public: IProfilingBlockInputStream() : is_cancelled(false) {} Block read(); /// Наследники должны реализовать эту функцию. virtual Block readImpl() = 0; /// Получить информацию о скорости выполнения. const BlockStreamProfileInfo & getInfo() const; /** Попросить прервать получение данных как можно скорее. * По-умолчанию - просто выставляет флаг is_cancelled и просит прерваться всех детей. */ virtual void cancel(); /** Установить колбэк, который вызывается, чтобы проверить, не был ли запрос остановлен. * Колбэк пробрасывается во все листовые источники и вызывается там перед чтением данных. * Следует иметь ввиду, что колбэк может вызываться из разных потоков. */ typedef boost::function IsCancelledCallback; void setIsCancelledCallback(IsCancelledCallback callback); /** Установить колбэк прогресса выполнения. * Колбэк пробрасывается во все источники. * По-умолчанию, он вызывается для листовых источников, после каждого блока. * (Но это может быть переопределено в методе progress()) * Функция принимает количество строк в последнем блоке, количество байт в последнем блоке. * Следует иметь ввиду, что колбэк может вызываться из разных потоков. */ typedef boost::function ProgressCallback; void setProgressCallback(ProgressCallback callback); virtual void progress(Block & block); protected: BlockStreamProfileInfo info; volatile bool is_cancelled; IsCancelledCallback is_cancelled_callback; ProgressCallback progress_callback; }; }