mirror of
https://github.com/ClickHouse/ClickHouse.git
synced 2024-09-20 08:40:50 +00:00
minor fixes near serializations
This commit is contained in:
parent
f37d12d16e
commit
84e914e05a
@ -185,7 +185,7 @@ StringRef ColumnArray::getDataAt(size_t n) const
|
||||
bool ColumnArray::isDefaultAt(size_t n) const
|
||||
{
|
||||
const auto & offsets_data = getOffsets();
|
||||
return offsets_data[n] == offsets_data[n - 1];
|
||||
return offsets_data[n] == offsets_data[static_cast<ssize_t>(n) - 1];
|
||||
}
|
||||
|
||||
|
||||
|
@ -47,7 +47,7 @@ ColumnSparse::ColumnSparse(MutableColumnPtr && values_, MutableColumnPtr && offs
|
||||
|
||||
if (!offsets_concrete->empty() && _size <= offsets_concrete->getData().back())
|
||||
throw Exception(ErrorCodes::LOGICAL_ERROR,
|
||||
"Size sparse columns ({}) should be greater than last position of non-default value ({})",
|
||||
"Size of sparse column ({}) should be greater than last position of non-default value ({})",
|
||||
_size, offsets_concrete->getData().back());
|
||||
|
||||
#ifndef NDEBUG
|
||||
|
@ -1,365 +0,0 @@
|
||||
#include <DataStreams/IBlockInputStream.h>
|
||||
|
||||
#include <Core/Field.h>
|
||||
#include <Interpreters/ProcessList.h>
|
||||
#include <Access/EnabledQuota.h>
|
||||
#include <Common/CurrentThread.h>
|
||||
#include <IO/WriteBufferFromString.h>
|
||||
#include <IO/Operators.h>
|
||||
|
||||
namespace ProfileEvents
|
||||
{
|
||||
extern const Event ThrottlerSleepMicroseconds;
|
||||
extern const Event SelectedRows;
|
||||
extern const Event SelectedBytes;
|
||||
}
|
||||
|
||||
|
||||
namespace DB
|
||||
{
|
||||
|
||||
namespace ErrorCodes
|
||||
{
|
||||
extern const int QUERY_WAS_CANCELLED;
|
||||
extern const int TOO_MANY_ROWS;
|
||||
extern const int TOO_MANY_BYTES;
|
||||
extern const int TOO_MANY_ROWS_OR_BYTES;
|
||||
extern const int LOGICAL_ERROR;
|
||||
}
|
||||
|
||||
|
||||
/// It's safe to access children without mutex as long as these methods are called before first call to `read()` or `readPrefix()`.
|
||||
|
||||
|
||||
Block IBlockInputStream::read()
|
||||
{
|
||||
if (total_rows_approx)
|
||||
{
|
||||
progressImpl(Progress(0, 0, total_rows_approx));
|
||||
total_rows_approx = 0;
|
||||
}
|
||||
|
||||
if (!info.started)
|
||||
{
|
||||
info.total_stopwatch.start();
|
||||
info.started = true;
|
||||
}
|
||||
|
||||
Block res;
|
||||
|
||||
if (isCancelledOrThrowIfKilled())
|
||||
return res;
|
||||
|
||||
if (!checkTimeLimit())
|
||||
limit_exceeded_need_break = true;
|
||||
|
||||
if (!limit_exceeded_need_break)
|
||||
res = readImpl();
|
||||
|
||||
if (res)
|
||||
{
|
||||
info.update(res);
|
||||
|
||||
if (enabled_extremes)
|
||||
updateExtremes(res);
|
||||
|
||||
if (limits.mode == LimitsMode::LIMITS_CURRENT && !limits.size_limits.check(info.rows, info.bytes, "result", ErrorCodes::TOO_MANY_ROWS_OR_BYTES))
|
||||
limit_exceeded_need_break = true;
|
||||
|
||||
if (quota)
|
||||
checkQuota(res);
|
||||
}
|
||||
else
|
||||
{
|
||||
/** If the stream is over, then we will ask all children to abort the execution.
|
||||
* This makes sense when running a query with LIMIT
|
||||
* - there is a situation when all the necessary data has already been read,
|
||||
* but children sources are still working,
|
||||
* herewith they can work in separate threads or even remotely.
|
||||
*/
|
||||
cancel(false);
|
||||
}
|
||||
|
||||
progress(Progress(res.rows(), res.bytes()));
|
||||
|
||||
#ifndef NDEBUG
|
||||
if (res)
|
||||
{
|
||||
Block header = getHeader();
|
||||
if (header)
|
||||
{
|
||||
if (columnsCanDifferInRepresentationAmongBlocks())
|
||||
assertCompatibleHeader(res, header, getName());
|
||||
else
|
||||
assertBlocksHaveEqualStructure(res, header, getName());
|
||||
}
|
||||
|
||||
}
|
||||
#endif
|
||||
|
||||
return res;
|
||||
}
|
||||
|
||||
|
||||
void IBlockInputStream::readPrefix()
|
||||
{
|
||||
#ifndef NDEBUG
|
||||
if (!read_prefix_is_called)
|
||||
read_prefix_is_called = true;
|
||||
else
|
||||
throw Exception("readPrefix is called twice for " + getName() + " stream", ErrorCodes::LOGICAL_ERROR);
|
||||
#endif
|
||||
|
||||
readPrefixImpl();
|
||||
|
||||
forEachChild([&] (IBlockInputStream & child)
|
||||
{
|
||||
child.readPrefix();
|
||||
return false;
|
||||
});
|
||||
}
|
||||
|
||||
|
||||
void IBlockInputStream::readSuffix()
|
||||
{
|
||||
#ifndef NDEBUG
|
||||
if (!read_suffix_is_called)
|
||||
read_suffix_is_called = true;
|
||||
else
|
||||
throw Exception("readSuffix is called twice for " + getName() + " stream", ErrorCodes::LOGICAL_ERROR);
|
||||
#endif
|
||||
|
||||
forEachChild([&] (IBlockInputStream & child)
|
||||
{
|
||||
child.readSuffix();
|
||||
return false;
|
||||
});
|
||||
|
||||
readSuffixImpl();
|
||||
}
|
||||
|
||||
|
||||
void IBlockInputStream::updateExtremes(Block & block)
|
||||
{
|
||||
size_t num_columns = block.columns();
|
||||
|
||||
if (!extremes)
|
||||
{
|
||||
MutableColumns extremes_columns(num_columns);
|
||||
|
||||
for (size_t i = 0; i < num_columns; ++i)
|
||||
{
|
||||
const ColumnPtr & src = block.safeGetByPosition(i).column;
|
||||
|
||||
if (isColumnConst(*src))
|
||||
{
|
||||
/// Equal min and max.
|
||||
extremes_columns[i] = src->cloneResized(2);
|
||||
}
|
||||
else
|
||||
{
|
||||
Field min_value;
|
||||
Field max_value;
|
||||
|
||||
src->getExtremes(min_value, max_value);
|
||||
|
||||
extremes_columns[i] = src->cloneEmpty();
|
||||
|
||||
extremes_columns[i]->insert(min_value);
|
||||
extremes_columns[i]->insert(max_value);
|
||||
}
|
||||
}
|
||||
|
||||
extremes = block.cloneWithColumns(std::move(extremes_columns));
|
||||
}
|
||||
else
|
||||
{
|
||||
for (size_t i = 0; i < num_columns; ++i)
|
||||
{
|
||||
ColumnPtr & old_extremes = extremes.safeGetByPosition(i).column;
|
||||
|
||||
if (isColumnConst(*old_extremes))
|
||||
continue;
|
||||
|
||||
Field min_value = (*old_extremes)[0];
|
||||
Field max_value = (*old_extremes)[1];
|
||||
|
||||
Field cur_min_value;
|
||||
Field cur_max_value;
|
||||
|
||||
block.safeGetByPosition(i).column->getExtremes(cur_min_value, cur_max_value);
|
||||
|
||||
if (cur_min_value < min_value)
|
||||
min_value = cur_min_value;
|
||||
if (cur_max_value > max_value)
|
||||
max_value = cur_max_value;
|
||||
|
||||
MutableColumnPtr new_extremes = old_extremes->cloneEmpty();
|
||||
|
||||
new_extremes->insert(min_value);
|
||||
new_extremes->insert(max_value);
|
||||
|
||||
old_extremes = std::move(new_extremes);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
bool IBlockInputStream::checkTimeLimit() const
|
||||
{
|
||||
return limits.speed_limits.checkTimeLimit(info.total_stopwatch, limits.timeout_overflow_mode);
|
||||
}
|
||||
|
||||
|
||||
void IBlockInputStream::checkQuota(Block & block)
|
||||
{
|
||||
switch (limits.mode)
|
||||
{
|
||||
case LimitsMode::LIMITS_TOTAL:
|
||||
/// Checked in `progress` method.
|
||||
break;
|
||||
|
||||
case LimitsMode::LIMITS_CURRENT:
|
||||
{
|
||||
UInt64 total_elapsed = info.total_stopwatch.elapsedNanoseconds();
|
||||
quota->used({Quota::RESULT_ROWS, block.rows()}, {Quota::RESULT_BYTES, block.bytes()}, {Quota::EXECUTION_TIME, total_elapsed - prev_elapsed});
|
||||
prev_elapsed = total_elapsed;
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
void IBlockInputStream::progressImpl(const Progress & value)
|
||||
{
|
||||
if (progress_callback)
|
||||
progress_callback(value);
|
||||
|
||||
if (process_list_elem)
|
||||
{
|
||||
if (!process_list_elem->updateProgressIn(value))
|
||||
cancel(/* kill */ true);
|
||||
|
||||
/// The total amount of data processed or intended for processing in all leaf sources, possibly on remote servers.
|
||||
|
||||
ProgressValues progress = process_list_elem->getProgressIn();
|
||||
size_t total_rows_estimate = std::max(progress.read_rows, progress.total_rows_to_read);
|
||||
|
||||
/** Check the restrictions on the amount of data to read, the speed of the query, the quota on the amount of data to read.
|
||||
* NOTE: Maybe it makes sense to have them checked directly in ProcessList?
|
||||
*/
|
||||
if (limits.mode == LimitsMode::LIMITS_TOTAL)
|
||||
{
|
||||
if (!limits.size_limits.check(total_rows_estimate, progress.read_bytes, "rows to read",
|
||||
ErrorCodes::TOO_MANY_ROWS, ErrorCodes::TOO_MANY_BYTES))
|
||||
cancel(false);
|
||||
}
|
||||
|
||||
size_t total_rows = progress.total_rows_to_read;
|
||||
|
||||
constexpr UInt64 profile_events_update_period_microseconds = 10 * 1000; // 10 milliseconds
|
||||
UInt64 total_elapsed_microseconds = info.total_stopwatch.elapsedMicroseconds();
|
||||
|
||||
if (last_profile_events_update_time + profile_events_update_period_microseconds < total_elapsed_microseconds)
|
||||
{
|
||||
CurrentThread::updatePerformanceCounters();
|
||||
last_profile_events_update_time = total_elapsed_microseconds;
|
||||
}
|
||||
|
||||
limits.speed_limits.throttle(progress.read_rows, progress.read_bytes, total_rows, total_elapsed_microseconds);
|
||||
|
||||
if (quota && limits.mode == LimitsMode::LIMITS_TOTAL)
|
||||
quota->used({Quota::READ_ROWS, value.read_rows}, {Quota::READ_BYTES, value.read_bytes});
|
||||
}
|
||||
|
||||
ProfileEvents::increment(ProfileEvents::SelectedRows, value.read_rows);
|
||||
ProfileEvents::increment(ProfileEvents::SelectedBytes, value.read_bytes);
|
||||
}
|
||||
|
||||
|
||||
void IBlockInputStream::cancel(bool kill)
|
||||
{
|
||||
if (kill)
|
||||
is_killed = true;
|
||||
|
||||
bool old_val = false;
|
||||
if (!is_cancelled.compare_exchange_strong(old_val, true, std::memory_order_seq_cst, std::memory_order_relaxed))
|
||||
return;
|
||||
|
||||
forEachChild([&] (IBlockInputStream & child)
|
||||
{
|
||||
child.cancel(kill);
|
||||
return false;
|
||||
});
|
||||
}
|
||||
|
||||
|
||||
bool IBlockInputStream::isCancelled() const
|
||||
{
|
||||
return is_cancelled;
|
||||
}
|
||||
|
||||
bool IBlockInputStream::isCancelledOrThrowIfKilled() const
|
||||
{
|
||||
if (!is_cancelled)
|
||||
return false;
|
||||
if (is_killed)
|
||||
throw Exception("Query was cancelled", ErrorCodes::QUERY_WAS_CANCELLED);
|
||||
return true;
|
||||
}
|
||||
|
||||
|
||||
void IBlockInputStream::setProgressCallback(const ProgressCallback & callback)
|
||||
{
|
||||
progress_callback = callback;
|
||||
|
||||
forEachChild([&] (IBlockInputStream & child)
|
||||
{
|
||||
child.setProgressCallback(callback);
|
||||
return false;
|
||||
});
|
||||
}
|
||||
|
||||
|
||||
void IBlockInputStream::setProcessListElement(QueryStatus * elem)
|
||||
{
|
||||
process_list_elem = elem;
|
||||
|
||||
forEachChild([&] (IBlockInputStream & child)
|
||||
{
|
||||
child.setProcessListElement(elem);
|
||||
return false;
|
||||
});
|
||||
}
|
||||
|
||||
|
||||
Block IBlockInputStream::getTotals()
|
||||
{
|
||||
if (totals)
|
||||
return totals;
|
||||
|
||||
Block res;
|
||||
forEachChild([&] (IBlockInputStream & child)
|
||||
{
|
||||
res = child.getTotals();
|
||||
return bool(res);
|
||||
});
|
||||
return res;
|
||||
}
|
||||
|
||||
|
||||
Block IBlockInputStream::getExtremes()
|
||||
{
|
||||
if (extremes)
|
||||
return extremes;
|
||||
|
||||
Block res;
|
||||
forEachChild([&] (IBlockInputStream & child)
|
||||
{
|
||||
res = child.getExtremes();
|
||||
return bool(res);
|
||||
});
|
||||
return res;
|
||||
}
|
||||
|
||||
}
|
@ -1,275 +0,0 @@
|
||||
#pragma once
|
||||
|
||||
#include <Core/Block.h>
|
||||
#include <DataStreams/BlockStreamProfileInfo.h>
|
||||
#include <DataStreams/ExecutionSpeedLimits.h>
|
||||
#include <DataStreams/SizeLimits.h>
|
||||
#include <DataStreams/StreamLocalLimits.h>
|
||||
#include <IO/Progress.h>
|
||||
#include <Storages/TableLockHolder.h>
|
||||
#include <Common/TypePromotion.h>
|
||||
|
||||
#include <atomic>
|
||||
#include <shared_mutex>
|
||||
|
||||
|
||||
namespace DB
|
||||
{
|
||||
|
||||
namespace ErrorCodes
|
||||
{
|
||||
}
|
||||
|
||||
class ProcessListElement;
|
||||
class EnabledQuota;
|
||||
class QueryStatus;
|
||||
|
||||
|
||||
/** The stream interface for reading data by blocks from the database.
|
||||
* Relational operations are supposed to be done also as implementations of this interface.
|
||||
* Watches out at how the source of the blocks works.
|
||||
* Lets you get information for profiling: rows per second, blocks per second, megabytes per second, etc.
|
||||
* Allows you to stop reading data (in nested sources).
|
||||
*/
|
||||
class IBlockInputStream : public TypePromotion<IBlockInputStream>
|
||||
{
|
||||
friend struct BlockStreamProfileInfo;
|
||||
|
||||
public:
|
||||
IBlockInputStream() { info.parent = this; }
|
||||
virtual ~IBlockInputStream() = default;
|
||||
|
||||
IBlockInputStream(const IBlockInputStream &) = delete;
|
||||
IBlockInputStream & operator=(const IBlockInputStream &) = delete;
|
||||
|
||||
/// To output the data stream transformation tree (query execution plan).
|
||||
virtual String getName() const = 0;
|
||||
|
||||
/** Get data structure of the stream in a form of "header" block (it is also called "sample block").
|
||||
* Header block contains column names, data types, columns of size 0. Constant columns must have corresponding values.
|
||||
* It is guaranteed that method "read" returns blocks of exactly that structure.
|
||||
*/
|
||||
virtual Block getHeader() const = 0;
|
||||
|
||||
virtual const BlockMissingValues & getMissingValues() const
|
||||
{
|
||||
static const BlockMissingValues none;
|
||||
return none;
|
||||
}
|
||||
|
||||
/** Read next block.
|
||||
* If there are no more blocks, return an empty block (for which operator `bool` returns false).
|
||||
* NOTE: Only one thread can read from one instance of IBlockInputStream simultaneously.
|
||||
* This also applies for readPrefix, readSuffix.
|
||||
*/
|
||||
Block read();
|
||||
|
||||
/** Read something before starting all data or after the end of all data.
|
||||
* In the `readSuffix` function, you can implement a finalization that can lead to an exception.
|
||||
* readPrefix() must be called before the first call to read().
|
||||
* readSuffix() should be called after read() returns an empty block, or after a call to cancel(), but not during read() execution.
|
||||
*/
|
||||
|
||||
/** The default implementation calls readPrefixImpl() on itself, and then readPrefix() recursively for all children.
|
||||
* There are cases when you do not want `readPrefix` of children to be called synchronously, in this function,
|
||||
* but you want them to be called, for example, in separate threads (for parallel initialization of children).
|
||||
* Then overload `readPrefix` function.
|
||||
*/
|
||||
virtual void readPrefix();
|
||||
|
||||
/** The default implementation calls recursively readSuffix() on all children, and then readSuffixImpl() on itself.
|
||||
* If this stream calls read() in children in a separate thread, this behavior is usually incorrect:
|
||||
* readSuffix() of the child can not be called at the moment when the same child's read() is executed in another thread.
|
||||
* In this case, you need to override this method so that readSuffix() in children is called, for example, after connecting streams.
|
||||
*/
|
||||
virtual void readSuffix();
|
||||
|
||||
/// Do not allow to change the table while the blocks stream and its children are alive.
|
||||
void addTableLock(const TableLockHolder & lock) { table_locks.push_back(lock); }
|
||||
|
||||
/// Get information about execution speed.
|
||||
const BlockStreamProfileInfo & getProfileInfo() const { return info; }
|
||||
|
||||
/** Get "total" values.
|
||||
* The default implementation takes them from itself or from the first child source in which they are.
|
||||
* The overridden method can perform some calculations. For example, apply an expression to the `totals` of the child source.
|
||||
* There can be no total values - then an empty block is returned.
|
||||
*
|
||||
* Call this method only after all the data has been retrieved with `read`,
|
||||
* otherwise there will be problems if any data at the same time is computed in another thread.
|
||||
*/
|
||||
virtual Block getTotals();
|
||||
|
||||
/// The same for minimums and maximums.
|
||||
virtual Block getExtremes();
|
||||
|
||||
|
||||
/** Set the execution progress bar callback.
|
||||
* The callback is passed to all child sources.
|
||||
* By default, it is called for leaf sources, after each block.
|
||||
* (But this can be overridden in the progress() method)
|
||||
* The function takes the number of rows in the last block, the number of bytes in the last block.
|
||||
* Note that the callback can be called from different threads.
|
||||
*/
|
||||
virtual void setProgressCallback(const ProgressCallback & callback);
|
||||
|
||||
|
||||
/** In this method:
|
||||
* - the progress callback is called;
|
||||
* - the status of the query execution in ProcessList is updated;
|
||||
* - checks restrictions and quotas that should be checked not within the same source,
|
||||
* but over the total amount of resources spent in all sources at once (information in the ProcessList).
|
||||
*/
|
||||
virtual void progress(const Progress & value)
|
||||
{
|
||||
/// The data for progress is taken from leaf sources.
|
||||
if (children.empty())
|
||||
progressImpl(value);
|
||||
}
|
||||
|
||||
void progressImpl(const Progress & value);
|
||||
|
||||
|
||||
/** Set the pointer to the process list item.
|
||||
* It is passed to all child sources.
|
||||
* General information about the resources spent on the request will be written into it.
|
||||
* Based on this information, the quota and some restrictions will be checked.
|
||||
* This information will also be available in the SHOW PROCESSLIST request.
|
||||
*/
|
||||
virtual void setProcessListElement(QueryStatus * elem);
|
||||
|
||||
/** Set the approximate total number of rows to read.
|
||||
*/
|
||||
void addTotalRowsApprox(size_t value) { total_rows_approx += value; }
|
||||
|
||||
|
||||
/** Ask to abort the receipt of data as soon as possible.
|
||||
* By default - just sets the flag is_cancelled and asks that all children be interrupted.
|
||||
* This function can be called several times, including simultaneously from different threads.
|
||||
* Have two modes:
|
||||
* with kill = false only is_cancelled is set - streams will stop silently with returning some processed data.
|
||||
* with kill = true also is_killed set - queries will stop with exception.
|
||||
*/
|
||||
virtual void cancel(bool kill);
|
||||
|
||||
bool isCancelled() const;
|
||||
bool isCancelledOrThrowIfKilled() const;
|
||||
|
||||
/** Set limitations that checked on each block. */
|
||||
virtual void setLimits(const StreamLocalLimits & limits_)
|
||||
{
|
||||
limits = limits_;
|
||||
}
|
||||
|
||||
const StreamLocalLimits & getLimits() const
|
||||
{
|
||||
return limits;
|
||||
}
|
||||
|
||||
/** Set the quota. If you set a quota on the amount of raw data,
|
||||
* then you should also set mode = LIMITS_TOTAL to LocalLimits with setLimits.
|
||||
*/
|
||||
virtual void setQuota(const std::shared_ptr<const EnabledQuota> & new_quota)
|
||||
{
|
||||
quota = new_quota;
|
||||
}
|
||||
|
||||
/// If true, columns with same name and type may have different representation (normal, const, sparse)
|
||||
/// in different blocks, that was read during stream execution.
|
||||
virtual bool columnsCanDifferInRepresentationAmongBlocks() const { return false; }
|
||||
|
||||
/// Enable calculation of minimums and maximums by the result columns.
|
||||
void enableExtremes() { enabled_extremes = true; }
|
||||
|
||||
protected:
|
||||
/// Order is important: `table_locks` must be destroyed after `children` so that tables from
|
||||
/// which child streams read are protected by the locks during the lifetime of the child streams.
|
||||
std::vector<TableLockHolder> table_locks;
|
||||
|
||||
BlockInputStreams children;
|
||||
std::shared_mutex children_mutex;
|
||||
|
||||
BlockStreamProfileInfo info;
|
||||
std::atomic<bool> is_cancelled{false};
|
||||
std::atomic<bool> is_killed{false};
|
||||
ProgressCallback progress_callback;
|
||||
QueryStatus * process_list_elem = nullptr;
|
||||
/// According to total_stopwatch in microseconds
|
||||
UInt64 last_profile_events_update_time = 0;
|
||||
|
||||
/// Additional information that can be generated during the work process.
|
||||
|
||||
/// Total values during aggregation.
|
||||
Block totals;
|
||||
/// Minimums and maximums. The first row of the block - minimums, the second - the maximums.
|
||||
Block extremes;
|
||||
|
||||
|
||||
void addChild(const BlockInputStreamPtr & child)
|
||||
{
|
||||
std::unique_lock lock(children_mutex);
|
||||
children.push_back(child);
|
||||
}
|
||||
|
||||
/** Check limits.
|
||||
* But only those that can be checked within each separate stream.
|
||||
*/
|
||||
bool checkTimeLimit() const;
|
||||
|
||||
#ifndef NDEBUG
|
||||
bool read_prefix_is_called = false;
|
||||
bool read_suffix_is_called = false;
|
||||
#endif
|
||||
|
||||
private:
|
||||
bool enabled_extremes = false;
|
||||
|
||||
/// The limit on the number of rows/bytes has been exceeded, and you need to stop execution on the next `read` call, as if the thread has run out.
|
||||
bool limit_exceeded_need_break = false;
|
||||
|
||||
/// Limitations and quotas.
|
||||
|
||||
StreamLocalLimits limits;
|
||||
|
||||
std::shared_ptr<const EnabledQuota> quota; /// If nullptr - the quota is not used.
|
||||
UInt64 prev_elapsed = 0;
|
||||
|
||||
/// The approximate total number of rows to read. For progress bar.
|
||||
size_t total_rows_approx = 0;
|
||||
|
||||
/// Derived classes must implement this function.
|
||||
virtual Block readImpl() = 0;
|
||||
|
||||
/// Here you can do a preliminary initialization.
|
||||
virtual void readPrefixImpl() {}
|
||||
|
||||
/// Here you need to do a finalization, which can lead to an exception.
|
||||
virtual void readSuffixImpl() {}
|
||||
|
||||
void updateExtremes(Block & block);
|
||||
|
||||
/** Check quotas.
|
||||
* But only those that can be checked within each separate stream.
|
||||
*/
|
||||
void checkQuota(Block & block);
|
||||
|
||||
size_t checkDepthImpl(size_t max_depth, size_t level) const;
|
||||
|
||||
template <typename F>
|
||||
void forEachChild(F && f)
|
||||
{
|
||||
/// NOTE: Acquire a read lock, therefore f() should be thread safe
|
||||
std::shared_lock lock(children_mutex);
|
||||
|
||||
// Reduce lock scope and avoid recursive locking since that is undefined for shared_mutex.
|
||||
const auto children_copy = children;
|
||||
lock.unlock();
|
||||
|
||||
for (auto & child : children_copy)
|
||||
if (f(*child))
|
||||
return;
|
||||
}
|
||||
|
||||
};
|
||||
|
||||
}
|
@ -1,29 +0,0 @@
|
||||
#include <DataStreams/materializeBlock.h>
|
||||
#include <Columns/ColumnSparse.h>
|
||||
|
||||
namespace DB
|
||||
{
|
||||
|
||||
Block materializeBlock(const Block & block)
|
||||
{
|
||||
if (!block)
|
||||
return block;
|
||||
|
||||
Block res = block;
|
||||
size_t columns = res.columns();
|
||||
for (size_t i = 0; i < columns; ++i)
|
||||
{
|
||||
auto & element = res.getByPosition(i);
|
||||
element.column = recursiveRemoveSparse(element.column->convertToFullColumnIfConst());
|
||||
}
|
||||
|
||||
return res;
|
||||
}
|
||||
|
||||
void materializeBlockInplace(Block & block)
|
||||
{
|
||||
for (size_t i = 0; i < block.columns(); ++i)
|
||||
block.getByPosition(i).column = recursiveRemoveSparse(block.getByPosition(i).column->convertToFullColumnIfConst());
|
||||
}
|
||||
|
||||
}
|
@ -17,7 +17,6 @@ namespace DB
|
||||
|
||||
namespace ErrorCodes
|
||||
{
|
||||
extern const int INCORRECT_DATA;
|
||||
extern const int CORRUPTED_DATA;
|
||||
}
|
||||
|
||||
|
@ -57,6 +57,7 @@ void SerializationInfoTuple::replaceData(const SerializationInfo & other)
|
||||
for (size_t i = 0; i < elems.size(); ++i)
|
||||
elems[i]->replaceData(*info_tuple.elems[i]);
|
||||
}
|
||||
|
||||
MutableSerializationInfoPtr SerializationInfoTuple::clone() const
|
||||
{
|
||||
MutableSerializationInfos elems_cloned;
|
||||
@ -70,7 +71,6 @@ MutableSerializationInfoPtr SerializationInfoTuple::clone() const
|
||||
void SerializationInfoTuple::serialializeKindBinary(WriteBuffer & out) const
|
||||
{
|
||||
SerializationInfo::serialializeKindBinary(out);
|
||||
|
||||
for (const auto & elem : elems)
|
||||
elem->serialializeKindBinary(out);
|
||||
}
|
||||
@ -78,7 +78,6 @@ void SerializationInfoTuple::serialializeKindBinary(WriteBuffer & out) const
|
||||
void SerializationInfoTuple::deserializeFromKindsBinary(ReadBuffer & in)
|
||||
{
|
||||
SerializationInfo::deserializeFromKindsBinary(in);
|
||||
|
||||
for (const auto & elem : elems)
|
||||
elem->deserializeFromKindsBinary(in);
|
||||
}
|
||||
|
@ -13,8 +13,6 @@
|
||||
#include <DataTypes/DataTypeLowCardinality.h>
|
||||
#include <DataTypes/Serializations/SerializationInfo.h>
|
||||
|
||||
#include <Columns/ColumnSparse.h>
|
||||
|
||||
|
||||
namespace DB
|
||||
{
|
||||
|
@ -1,45 +0,0 @@
|
||||
#pragma once
|
||||
#include <DataStreams/IBlockInputStream.h>
|
||||
|
||||
namespace DB
|
||||
{
|
||||
|
||||
class QueryPipeline;
|
||||
class PullingAsyncPipelineExecutor;
|
||||
class PullingPipelineExecutor;
|
||||
|
||||
/// Implement IBlockInputStream from QueryPipeline.
|
||||
/// It's a temporary wrapper.
|
||||
class PipelineExecutingBlockInputStream : public IBlockInputStream
|
||||
{
|
||||
public:
|
||||
explicit PipelineExecutingBlockInputStream(QueryPipeline pipeline_);
|
||||
~PipelineExecutingBlockInputStream() override;
|
||||
|
||||
String getName() const override { return "PipelineExecuting"; }
|
||||
Block getHeader() const override;
|
||||
bool columnsCanDifferInRepresentationAmongBlocks() const override { return true; }
|
||||
|
||||
void cancel(bool kill) override;
|
||||
|
||||
/// Implement IBlockInputStream methods via QueryPipeline.
|
||||
void setProgressCallback(const ProgressCallback & callback) final;
|
||||
void setProcessListElement(QueryStatus * elem) final;
|
||||
void setLimits(const StreamLocalLimits & limits_) final;
|
||||
void setQuota(const std::shared_ptr<const EnabledQuota> & quota_) final;
|
||||
|
||||
protected:
|
||||
void readPrefixImpl() override;
|
||||
Block readImpl() override;
|
||||
|
||||
private:
|
||||
std::unique_ptr<QueryPipeline> pipeline;
|
||||
/// One of executors is used.
|
||||
std::unique_ptr<PullingPipelineExecutor> executor; /// for single thread.
|
||||
std::unique_ptr<PullingAsyncPipelineExecutor> async_executor; /// for many threads.
|
||||
bool is_execution_started = false;
|
||||
|
||||
void createExecutor();
|
||||
};
|
||||
|
||||
}
|
@ -4,8 +4,6 @@
|
||||
#include <Columns/ColumnsCommon.h>
|
||||
#include <Core/Field.h>
|
||||
|
||||
#include <iostream>
|
||||
|
||||
namespace DB
|
||||
{
|
||||
|
||||
|
@ -1,53 +0,0 @@
|
||||
#pragma once
|
||||
|
||||
#include <unordered_map>
|
||||
#include <DataStreams/IBlockInputStream.h>
|
||||
|
||||
|
||||
namespace DB
|
||||
{
|
||||
|
||||
/** Convert one block structure to another:
|
||||
*
|
||||
* Leaves only necessary columns;
|
||||
*
|
||||
* Columns are searched in source first by name;
|
||||
* and if there is no column with same name, then by position.
|
||||
*
|
||||
* Converting types of matching columns (with CAST function).
|
||||
*
|
||||
* Materializing columns which are const in source and non-const in result,
|
||||
* throw if they are const in result and non const in source,
|
||||
* or if they are const and have different values.
|
||||
*/
|
||||
class ConvertingBlockInputStream : public IBlockInputStream
|
||||
{
|
||||
public:
|
||||
enum class MatchColumnsMode
|
||||
{
|
||||
/// Require same number of columns in source and result. Match columns by corresponding positions, regardless to names.
|
||||
Position,
|
||||
/// Find columns in source by their names. Allow excessive columns in source.
|
||||
Name
|
||||
};
|
||||
|
||||
ConvertingBlockInputStream(
|
||||
const BlockInputStreamPtr & input,
|
||||
const Block & result_header,
|
||||
MatchColumnsMode mode);
|
||||
|
||||
String getName() const override { return "Converting"; }
|
||||
Block getHeader() const override { return header; }
|
||||
bool columnsCanDifferInRepresentationAmongBlocks() const override { return true; }
|
||||
|
||||
private:
|
||||
Block readImpl() override;
|
||||
|
||||
Block header;
|
||||
|
||||
/// How to construct result block. Position in source block, where to get each column.
|
||||
using Conversion = std::vector<size_t>;
|
||||
Conversion conversion;
|
||||
};
|
||||
|
||||
}
|
@ -400,9 +400,6 @@ public:
|
||||
/// part creation (using alter query with materialize_ttl setting).
|
||||
bool checkAllTTLCalculated(const StorageMetadataPtr & metadata_snapshot) const;
|
||||
|
||||
// /// Returns serialization for column according to serialization_info.
|
||||
// SerializationPtr getSerializationForColumn(const NameAndTypePair & column) const;
|
||||
|
||||
/// Return some uniq string for file
|
||||
/// Required for distinguish different copies of the same part on S3
|
||||
String getUniqueId() const;
|
||||
|
@ -6,6 +6,7 @@
|
||||
#include <base/logger_useful.h>
|
||||
#include <Common/ActionBlocker.h>
|
||||
|
||||
#include <DataTypes/Serializations/SerializationInfo.h>
|
||||
#include <Storages/MergeTree/MergeTreeData.h>
|
||||
#include <Storages/MergeTree/IMergeTreeDataPart.h>
|
||||
#include <Storages/MergeTree/MergeTreeSequentialSource.h>
|
||||
|
@ -8,7 +8,6 @@
|
||||
#include <Storages/MergeTree/FutureMergedMutatedPart.h>
|
||||
#include <Storages/MergeTree/ColumnSizeEstimator.h>
|
||||
#include <Storages/MergeTree/MergedColumnOnlyOutputStream.h>
|
||||
#include <DataTypes/Serializations/SerializationInfo.h>
|
||||
#include <Processors/Transforms/ColumnGathererTransform.h>
|
||||
#include <Processors/Executors/PullingPipelineExecutor.h>
|
||||
#include <Compression/CompressedReadBufferFromFile.h>
|
||||
|
@ -17,8 +17,6 @@
|
||||
#include <Compression/CompressedWriteBuffer.h>
|
||||
|
||||
#include <DataTypes/NestedUtils.h>
|
||||
#include <DataTypes/Serializations/SerializationInfo.h>
|
||||
|
||||
#include <Columns/ColumnArray.h>
|
||||
|
||||
#include <Interpreters/Context.h>
|
||||
|
@ -39,8 +39,8 @@ def create_table(cluster, table_name, additional_settings=None):
|
||||
|
||||
FILES_OVERHEAD = 1
|
||||
FILES_OVERHEAD_PER_COLUMN = 2 # Data and mark files
|
||||
FILES_OVERHEAD_PER_PART_WIDE = FILES_OVERHEAD_PER_COLUMN * 3 + 2 + 6 + 1 + 1
|
||||
FILES_OVERHEAD_PER_PART_COMPACT = 10 + 1 + 1
|
||||
FILES_OVERHEAD_PER_PART_WIDE = FILES_OVERHEAD_PER_COLUMN * 3 + 2 + 6 + 1
|
||||
FILES_OVERHEAD_PER_PART_COMPACT = 10 + 1
|
||||
|
||||
|
||||
def random_string(length):
|
||||
|
@ -67,8 +67,8 @@ def cluster():
|
||||
|
||||
FILES_OVERHEAD = 1
|
||||
FILES_OVERHEAD_PER_COLUMN = 2 # Data and mark files
|
||||
FILES_OVERHEAD_PER_PART_WIDE = FILES_OVERHEAD_PER_COLUMN * 3 + 2 + 6 + 1 + 1
|
||||
FILES_OVERHEAD_PER_PART_COMPACT = 10 + 1 + 1
|
||||
FILES_OVERHEAD_PER_PART_WIDE = FILES_OVERHEAD_PER_COLUMN * 3 + 2 + 6 + 1
|
||||
FILES_OVERHEAD_PER_PART_COMPACT = 10 + 1
|
||||
|
||||
|
||||
def random_string(length):
|
||||
|
Loading…
Reference in New Issue
Block a user