Merge branch 'master' into add_jepsen_test

This commit is contained in:
alesapin 2021-12-22 13:30:48 +03:00
commit e63b2f0de8
36 changed files with 473 additions and 201 deletions

View File

@ -953,7 +953,7 @@ jobs:
- BuilderDebMsan
- BuilderDebDebug
runs-on: [self-hosted, style-checker]
if: always()
if: ${{ success() || failure() }}
steps:
- name: Set envs
run: |
@ -993,7 +993,7 @@ jobs:
- BuilderBinDarwinAarch64
- BuilderBinPPC64
runs-on: [self-hosted, style-checker]
if: always()
if: ${{ success() || failure() }}
steps:
- name: Set envs
run: |

41
.github/workflows/woboq.yml vendored Normal file
View File

@ -0,0 +1,41 @@
name: WoboqBuilder
env:
# Force the stdout and stderr streams to be unbuffered
PYTHONUNBUFFERED: 1
concurrency:
group: woboq
on: # yamllint disable-line rule:truthy
schedule:
- cron: '0 */18 * * *'
jobs:
# don't use dockerhub push because this image updates so rarely
WoboqCodebrowser:
runs-on: [self-hosted, style-checker]
steps:
- name: Set envs
run: |
cat >> "$GITHUB_ENV" << 'EOF'
TEMP_PATH=${{runner.temp}}/codebrowser
REPO_COPY=${{runner.temp}}/codebrowser/ClickHouse
IMAGES_PATH=${{runner.temp}}/images_path
EOF
- name: Clear repository
run: |
sudo rm -fr $GITHUB_WORKSPACE && mkdir $GITHUB_WORKSPACE
- name: Check out repository code
uses: actions/checkout@v2
with:
submodules: 'true'
- name: Codebrowser
run: |
sudo rm -fr $TEMP_PATH
mkdir -p $TEMP_PATH
cp -r $GITHUB_WORKSPACE $TEMP_PATH
cd $REPO_COPY/tests/ci && python3 codebrowser_check.py
- name: Cleanup
if: always()
run: |
docker kill $(docker ps -q) ||:
docker rm -f $(docker ps -a -q) ||:
sudo rm -fr $TEMP_PATH

View File

@ -6,7 +6,7 @@ FROM clickhouse/binary-builder
ARG apt_archive="http://archive.ubuntu.com"
RUN sed -i "s|http://archive.ubuntu.com|$apt_archive|g" /etc/apt/sources.list
RUN apt-get update && apt-get --yes --allow-unauthenticated install clang-9 libllvm9 libclang-9-dev
RUN apt-get update && apt-get --yes --allow-unauthenticated install clang-13 libllvm13 libclang-13-dev
# repo versions doesn't work correctly with C++17
# also we push reports to s3, so we add index.html to subfolder urls
@ -23,12 +23,12 @@ ENV SOURCE_DIRECTORY=/repo_folder
ENV BUILD_DIRECTORY=/build
ENV HTML_RESULT_DIRECTORY=$BUILD_DIRECTORY/html_report
ENV SHA=nosha
ENV DATA="data"
ENV DATA="https://s3.amazonaws.com/clickhouse-test-reports/codebrowser/data"
CMD mkdir -p $BUILD_DIRECTORY && cd $BUILD_DIRECTORY && \
cmake $SOURCE_DIRECTORY -DCMAKE_CXX_COMPILER=/usr/bin/clang\+\+-13 -DCMAKE_C_COMPILER=/usr/bin/clang-13 -DCMAKE_EXPORT_COMPILE_COMMANDS=ON -DENABLE_EMBEDDED_COMPILER=0 -DENABLE_S3=0 && \
mkdir -p $HTML_RESULT_DIRECTORY && \
$CODEGEN -b $BUILD_DIRECTORY -a -o $HTML_RESULT_DIRECTORY -p ClickHouse:$SOURCE_DIRECTORY:$SHA -d $DATA | ts '%Y-%m-%d %H:%M:%S' && \
cp -r $STATIC_DATA $HTML_RESULT_DIRECTORY/ &&\
$CODEINDEX $HTML_RESULT_DIRECTORY -d $DATA | ts '%Y-%m-%d %H:%M:%S' && \
$CODEINDEX $HTML_RESULT_DIRECTORY -d "$DATA" | ts '%Y-%m-%d %H:%M:%S' && \
mv $HTML_RESULT_DIRECTORY /test_output

View File

@ -14,11 +14,11 @@ To enable Kerberos, one should include `kerberos` section in `config.xml`. This
#### Parameters:
- `principal` - canonical service principal name that will be acquired and used when accepting security contexts.
- This parameter is optional, if omitted, the default principal will be used.
- This parameter is optional, if omitted, the default principal will be used.
- `realm` - a realm, that will be used to restrict authentication to only those requests whose initiator's realm matches it.
- This parameter is optional, if omitted, no additional filtering by realm will be applied.
- This parameter is optional, if omitted, no additional filtering by realm will be applied.
Example (goes into `config.xml`):
@ -75,7 +75,7 @@ In order to enable Kerberos authentication for the user, specify `kerberos` sect
Parameters:
- `realm` - a realm that will be used to restrict authentication to only those requests whose initiator's realm matches it.
- This parameter is optional, if omitted, no additional filtering by realm will be applied.
- This parameter is optional, if omitted, no additional filtering by realm will be applied.
Example (goes into `users.xml`):

View File

@ -259,6 +259,8 @@
M(RemoteFSUnusedPrefetches, "Number of prefetches pending at buffer destruction") \
M(RemoteFSPrefetchedReads, "Number of reads from prefecthed buffer") \
M(RemoteFSUnprefetchedReads, "Number of reads from unprefetched buffer") \
M(RemoteFSLazySeeks, "Number of lazy seeks") \
M(RemoteFSSeeksWithReset, "Number of seeks which lead to a new connection") \
M(RemoteFSBuffers, "Number of buffers created for asynchronous reading from remote filesystem") \
\
M(ReadBufferSeekCancelConnection, "Number of seeks which lead to new connection (s3, http)") \

View File

@ -21,6 +21,8 @@ namespace ProfileEvents
extern const Event RemoteFSUnusedPrefetches;
extern const Event RemoteFSPrefetchedReads;
extern const Event RemoteFSUnprefetchedReads;
extern const Event RemoteFSLazySeeks;
extern const Event RemoteFSSeeksWithReset;
extern const Event RemoteFSBuffers;
}
@ -152,11 +154,16 @@ bool AsynchronousReadIndirectBufferFromRemoteFS::nextImpl()
CurrentMetrics::Increment metric_increment{CurrentMetrics::AsynchronousReadWait};
Stopwatch watch;
{
size = prefetch_future.get();
auto result = prefetch_future.get();
size = result.size;
auto offset = result.offset;
assert(offset < size);
if (size)
{
memory.swap(prefetch_buffer);
set(memory.data(), memory.size());
size -= offset;
set(memory.data() + offset, size);
working_buffer.resize(size);
file_offset_of_buffer_end += size;
}
@ -168,16 +175,23 @@ bool AsynchronousReadIndirectBufferFromRemoteFS::nextImpl()
else
{
ProfileEvents::increment(ProfileEvents::RemoteFSUnprefetchedReads);
size = readInto(memory.data(), memory.size()).get();
auto result = readInto(memory.data(), memory.size()).get();
size = result.size;
auto offset = result.offset;
assert(offset < size);
if (size)
{
set(memory.data(), memory.size());
size -= offset;
set(memory.data() + offset, size);
working_buffer.resize(size);
file_offset_of_buffer_end += size;
}
}
if (file_offset_of_buffer_end != impl->offset())
throw Exception(ErrorCodes::LOGICAL_ERROR, "Expected equality {} == {}. It's a bug", file_offset_of_buffer_end, impl->offset());
prefetch_future = {};
return size;
}
@ -231,18 +245,22 @@ off_t AsynchronousReadIndirectBufferFromRemoteFS::seek(off_t offset_, int whence
pos = working_buffer.end();
/// Note: we read in range [file_offset_of_buffer_end, read_until_position).
if (read_until_position && file_offset_of_buffer_end < *read_until_position
&& static_cast<off_t>(file_offset_of_buffer_end) >= getPosition()
&& static_cast<off_t>(file_offset_of_buffer_end) < getPosition() + static_cast<off_t>(min_bytes_for_seek))
/**
* Lazy ignore. Save number of bytes to ignore and ignore it either for prefetch buffer or current buffer.
* Note: we read in range [file_offset_of_buffer_end, read_until_position).
*/
off_t file_offset_before_seek = impl->offset();
if (impl->initialized()
&& read_until_position && file_offset_of_buffer_end < *read_until_position
&& static_cast<off_t>(file_offset_of_buffer_end) > file_offset_before_seek
&& static_cast<off_t>(file_offset_of_buffer_end) < file_offset_before_seek + static_cast<off_t>(min_bytes_for_seek))
{
/**
* Lazy ignore. Save number of bytes to ignore and ignore it either for prefetch buffer or current buffer.
*/
bytes_to_ignore = file_offset_of_buffer_end - getPosition();
ProfileEvents::increment(ProfileEvents::RemoteFSLazySeeks);
bytes_to_ignore = file_offset_of_buffer_end - file_offset_before_seek;
}
else
{
ProfileEvents::increment(ProfileEvents::RemoteFSSeeksWithReset);
impl->reset();
}

View File

@ -65,7 +65,7 @@ ReadBufferFromRemoteFSGather::ReadBufferFromRemoteFSGather(const RemoteMetadata
}
size_t ReadBufferFromRemoteFSGather::readInto(char * data, size_t size, size_t offset, size_t ignore)
ReadBufferFromRemoteFSGather::ReadResult ReadBufferFromRemoteFSGather::readInto(char * data, size_t size, size_t offset, size_t ignore)
{
/**
* Set `data` to current working and internal buffers.
@ -73,23 +73,24 @@ size_t ReadBufferFromRemoteFSGather::readInto(char * data, size_t size, size_t o
*/
set(data, size);
absolute_position = offset;
file_offset_of_buffer_end = offset;
bytes_to_ignore = ignore;
if (bytes_to_ignore)
assert(initialized());
auto result = nextImpl();
bytes_to_ignore = 0;
if (result)
return working_buffer.size();
return {working_buffer.size(), BufferBase::offset()};
return 0;
return {0, 0};
}
void ReadBufferFromRemoteFSGather::initialize()
{
/// One clickhouse file can be split into multiple files in remote fs.
auto current_buf_offset = absolute_position;
auto current_buf_offset = file_offset_of_buffer_end;
for (size_t i = 0; i < metadata.remote_fs_objects.size(); ++i)
{
const auto & [file_path, size] = metadata.remote_fs_objects[i];
@ -144,7 +145,6 @@ bool ReadBufferFromRemoteFSGather::nextImpl()
return readImpl();
}
bool ReadBufferFromRemoteFSGather::readImpl()
{
swap(*current_buf);
@ -155,15 +155,26 @@ bool ReadBufferFromRemoteFSGather::readImpl()
* we save how many bytes need to be ignored (new_offset - position() bytes).
*/
if (bytes_to_ignore)
{
current_buf->ignore(bytes_to_ignore);
bytes_to_ignore = 0;
}
auto result = current_buf->next();
bool result = current_buf->hasPendingData();
if (result)
{
/// bytes_to_ignore already added.
file_offset_of_buffer_end += current_buf->available();
}
else
{
result = current_buf->next();
if (result)
file_offset_of_buffer_end += current_buf->buffer().size();
}
swap(*current_buf);
if (result)
absolute_position += working_buffer.size();
return result;
}
@ -180,7 +191,6 @@ void ReadBufferFromRemoteFSGather::reset()
current_buf.reset();
}
String ReadBufferFromRemoteFSGather::getFileName() const
{
return canonical_path;

View File

@ -37,10 +37,20 @@ public:
void setReadUntilPosition(size_t position) override;
size_t readInto(char * data, size_t size, size_t offset, size_t ignore = 0);
struct ReadResult
{
size_t size = 0;
size_t offset = 0;
};
ReadResult readInto(char * data, size_t size, size_t offset, size_t ignore = 0);
size_t getFileSize() const;
size_t offset() const { return file_offset_of_buffer_end; }
bool initialized() const { return current_buf != nullptr; }
protected:
virtual SeekableReadBufferPtr createImplementationBuffer(const String & path, size_t read_until_position) const = 0;
@ -57,8 +67,13 @@ private:
size_t current_buf_idx = 0;
size_t absolute_position = 0;
size_t file_offset_of_buffer_end = 0;
/**
* File: |___________________|
* Buffer: |~~~~~~~|
* file_offset_of_buffer_end: ^
*/
size_t bytes_to_ignore = 0;
size_t read_until_position = 0;

View File

@ -20,7 +20,7 @@ ReadIndirectBufferFromRemoteFS::ReadIndirectBufferFromRemoteFS(
off_t ReadIndirectBufferFromRemoteFS::getPosition()
{
return impl->absolute_position - available();
return impl->file_offset_of_buffer_end - available();
}
@ -35,29 +35,29 @@ off_t ReadIndirectBufferFromRemoteFS::seek(off_t offset_, int whence)
if (whence == SEEK_CUR)
{
/// If position within current working buffer - shift pos.
if (!working_buffer.empty() && size_t(getPosition() + offset_) < impl->absolute_position)
if (!working_buffer.empty() && size_t(getPosition() + offset_) < impl->file_offset_of_buffer_end)
{
pos += offset_;
return getPosition();
}
else
{
impl->absolute_position += offset_;
impl->file_offset_of_buffer_end += offset_;
}
}
else if (whence == SEEK_SET)
{
/// If position within current working buffer - shift pos.
if (!working_buffer.empty()
&& size_t(offset_) >= impl->absolute_position - working_buffer.size()
&& size_t(offset_) < impl->absolute_position)
&& size_t(offset_) >= impl->file_offset_of_buffer_end - working_buffer.size()
&& size_t(offset_) < impl->file_offset_of_buffer_end)
{
pos = working_buffer.end() - (impl->absolute_position - offset_);
pos = working_buffer.end() - (impl->file_offset_of_buffer_end - offset_);
return getPosition();
}
else
{
impl->absolute_position = offset_;
impl->file_offset_of_buffer_end = offset_;
}
}
else
@ -66,7 +66,7 @@ off_t ReadIndirectBufferFromRemoteFS::seek(off_t offset_, int whence)
impl->reset();
pos = working_buffer.end();
return impl->absolute_position;
return impl->file_offset_of_buffer_end;
}

View File

@ -8,7 +8,6 @@
#include <Common/setThreadName.h>
#include <IO/SeekableReadBuffer.h>
#include <Disks/IO/ReadBufferFromRemoteFSGather.h>
#include <future>
#include <iostream>
@ -28,7 +27,7 @@ namespace CurrentMetrics
namespace DB
{
size_t ThreadPoolRemoteFSReader::RemoteFSFileDescriptor::readInto(char * data, size_t size, size_t offset, size_t ignore)
ReadBufferFromRemoteFSGather::ReadResult ThreadPoolRemoteFSReader::RemoteFSFileDescriptor::readInto(char * data, size_t size, size_t offset, size_t ignore)
{
return reader->readInto(data, size, offset, ignore);
}
@ -44,18 +43,18 @@ std::future<IAsynchronousReader::Result> ThreadPoolRemoteFSReader::submit(Reques
{
auto task = std::make_shared<std::packaged_task<Result()>>([request]
{
setThreadName("ThreadPoolRemoteFSRead");
setThreadName("VFSRead");
CurrentMetrics::Increment metric_increment{CurrentMetrics::Read};
auto * remote_fs_fd = assert_cast<RemoteFSFileDescriptor *>(request.descriptor.get());
Stopwatch watch(CLOCK_MONOTONIC);
auto bytes_read = remote_fs_fd->readInto(request.buf, request.size, request.offset, request.ignore);
auto [bytes_read, offset] = remote_fs_fd->readInto(request.buf, request.size, request.offset, request.ignore);
watch.stop();
ProfileEvents::increment(ProfileEvents::RemoteFSReadMicroseconds, watch.elapsedMicroseconds());
ProfileEvents::increment(ProfileEvents::RemoteFSReadBytes, bytes_read);
return bytes_read;
return Result{ .size = bytes_read, .offset = offset };
});
auto future = task->get_future();

View File

@ -3,12 +3,12 @@
#include <IO/AsynchronousReader.h>
#include <IO/SeekableReadBuffer.h>
#include <Common/ThreadPool.h>
#include <Disks/IO/ReadBufferFromRemoteFSGather.h>
#include <Disks/IDiskRemote.h>
namespace DB
{
class ReadBufferFromRemoteFSGather;
class ThreadPoolRemoteFSReader : public IAsynchronousReader
{
@ -28,9 +28,9 @@ public:
struct ThreadPoolRemoteFSReader::RemoteFSFileDescriptor : public IFileDescriptor
{
public:
RemoteFSFileDescriptor(std::shared_ptr<ReadBufferFromRemoteFSGather> reader_) : reader(reader_) {}
explicit RemoteFSFileDescriptor(std::shared_ptr<ReadBufferFromRemoteFSGather> reader_) : reader(reader_) {}
size_t readInto(char * data, size_t size, size_t offset, size_t ignore = 0);
ReadBufferFromRemoteFSGather::ReadResult readInto(char * data, size_t size, size_t offset, size_t ignore = 0);
private:
std::shared_ptr<ReadBufferFromRemoteFSGather> reader;

View File

@ -69,7 +69,8 @@ bool AsynchronousReadBufferFromFileDescriptor::nextImpl()
{
Stopwatch watch;
CurrentMetrics::Increment metric_increment{CurrentMetrics::AsynchronousReadWait};
size = prefetch_future.get();
auto result = prefetch_future.get();
size = result.size;
ProfileEvents::increment(ProfileEvents::AsynchronousReadWaitMicroseconds, watch.elapsedMicroseconds());
}
@ -90,7 +91,7 @@ bool AsynchronousReadBufferFromFileDescriptor::nextImpl()
{
/// No pending request. Do synchronous read.
auto size = readInto(memory.data(), memory.size()).get();
auto [size, _] = readInto(memory.data(), memory.size()).get();
file_offset_of_buffer_end += size;
if (size)
@ -201,4 +202,3 @@ void AsynchronousReadBufferFromFileDescriptor::rewind()
}
}

View File

@ -49,10 +49,18 @@ public:
size_t ignore = 0;
};
/// Less than requested amount of data can be returned.
/// If size is zero - the file has ended.
/// (for example, EINTR must be handled by implementation automatically)
using Result = size_t;
struct Result
{
/// size
/// Less than requested amount of data can be returned.
/// If size is zero - the file has ended.
/// (for example, EINTR must be handled by implementation automatically)
size_t size = 0;
/// offset
/// Optional. Useful when implementation needs to do ignore().
size_t offset = 0;
};
/// Submit request and obtain a handle. This method don't perform any waits.
/// If this method did not throw, the caller must wait for the result with 'wait' method

View File

@ -82,10 +82,9 @@ std::future<IAsynchronousReader::Result> SynchronousReader::submit(Request reque
watch.stop();
ProfileEvents::increment(ProfileEvents::DiskReadElapsedMicroseconds, watch.elapsedMicroseconds());
return bytes_read;
return Result{ .size = bytes_read, .offset = 0};
});
}
}

View File

@ -117,7 +117,7 @@ std::future<IAsynchronousReader::Result> ThreadPoolReader::submit(Request reques
if (!res)
{
/// The file has ended.
promise.set_value(0);
promise.set_value({0, 0});
watch.stop();
ProfileEvents::increment(ProfileEvents::ThreadPoolReaderPageCacheHitElapsedMicroseconds, watch.elapsedMicroseconds());
@ -176,7 +176,7 @@ std::future<IAsynchronousReader::Result> ThreadPoolReader::submit(Request reques
ProfileEvents::increment(ProfileEvents::ThreadPoolReaderPageCacheHitElapsedMicroseconds, watch.elapsedMicroseconds());
ProfileEvents::increment(ProfileEvents::DiskReadElapsedMicroseconds, watch.elapsedMicroseconds());
promise.set_value(bytes_read);
promise.set_value({bytes_read, 0});
return future;
}
}
@ -219,7 +219,7 @@ std::future<IAsynchronousReader::Result> ThreadPoolReader::submit(Request reques
ProfileEvents::increment(ProfileEvents::ThreadPoolReaderPageCacheMissElapsedMicroseconds, watch.elapsedMicroseconds());
ProfileEvents::increment(ProfileEvents::DiskReadElapsedMicroseconds, watch.elapsedMicroseconds());
return bytes_read;
return Result{ .size = bytes_read, .offset = 0 };
});
auto future = task->get_future();

View File

@ -116,82 +116,62 @@ bool checkPositionalArguments(ASTPtr & argument, const ASTSelectQuery * select_q
}
}
/// In case of expression/function (order by 1+2 and 2*x1, greatest(1, 2)) replace
/// positions only if all literals are numbers, otherwise it is not positional.
bool positional = true;
const auto * ast_literal = typeid_cast<const ASTLiteral *>(argument.get());
if (!ast_literal)
return false;
/// Case when GROUP BY element is position.
if (const auto * ast_literal = typeid_cast<const ASTLiteral *>(argument.get()))
auto which = ast_literal->value.getType();
if (which != Field::Types::UInt64)
return false;
auto pos = ast_literal->value.get<UInt64>();
if (!pos || pos > columns.size())
throw Exception(ErrorCodes::ILLEGAL_TYPE_OF_ARGUMENT,
"Positional argument out of bounds: {} (exprected in range [1, {}]",
pos, columns.size());
const auto & column = columns[--pos];
if (typeid_cast<const ASTIdentifier *>(column.get()))
{
auto which = ast_literal->value.getType();
if (which == Field::Types::UInt64)
argument = column->clone();
}
else if (typeid_cast<const ASTFunction *>(column.get()))
{
std::function<void(ASTPtr)> throw_if_aggregate_function = [&](ASTPtr node)
{
auto pos = ast_literal->value.get<UInt64>();
if (pos > 0 && pos <= columns.size())
if (const auto * function = typeid_cast<const ASTFunction *>(node.get()))
{
const auto & column = columns[--pos];
if (typeid_cast<const ASTIdentifier *>(column.get()))
auto is_aggregate_function = AggregateFunctionFactory::instance().isAggregateFunctionName(function->name);
if (is_aggregate_function)
{
argument = column->clone();
}
else if (typeid_cast<const ASTFunction *>(column.get()))
{
std::function<void(ASTPtr)> throw_if_aggregate_function = [&](ASTPtr node)
{
if (const auto * function = typeid_cast<const ASTFunction *>(node.get()))
{
auto is_aggregate_function = AggregateFunctionFactory::instance().isAggregateFunctionName(function->name);
if (is_aggregate_function)
{
throw Exception(ErrorCodes::ILLEGAL_TYPE_OF_ARGUMENT,
"Illegal value (aggregate function) for positional argument in {}",
ASTSelectQuery::expressionToString(expression));
}
else
{
if (function->arguments)
{
for (const auto & arg : function->arguments->children)
throw_if_aggregate_function(arg);
}
}
}
};
if (expression == ASTSelectQuery::Expression::GROUP_BY)
throw_if_aggregate_function(column);
argument = column->clone();
throw Exception(ErrorCodes::ILLEGAL_TYPE_OF_ARGUMENT,
"Illegal value (aggregate function) for positional argument in {}",
ASTSelectQuery::expressionToString(expression));
}
else
{
throw Exception(ErrorCodes::ILLEGAL_TYPE_OF_ARGUMENT,
"Illegal value for positional argument in {}",
ASTSelectQuery::expressionToString(expression));
if (function->arguments)
{
for (const auto & arg : function->arguments->children)
throw_if_aggregate_function(arg);
}
}
}
else if (pos > columns.size() || !pos)
{
throw Exception(ErrorCodes::ILLEGAL_TYPE_OF_ARGUMENT,
"Positional argument out of bounds: {} (exprected in range [1, {}]",
pos, columns.size());
}
}
else
positional = false;
}
else if (const auto * ast_function = typeid_cast<const ASTFunction *>(argument.get()))
{
if (ast_function->arguments)
{
for (auto & arg : ast_function->arguments->children)
positional &= checkPositionalArguments(arg, select_query, expression);
}
};
if (expression == ASTSelectQuery::Expression::GROUP_BY)
throw_if_aggregate_function(column);
argument = column->clone();
}
else
positional = false;
{
throw Exception(ErrorCodes::ILLEGAL_TYPE_OF_ARGUMENT,
"Illegal value for positional argument in {}",
ASTSelectQuery::expressionToString(expression));
}
return positional;
return true;
}
void replaceForPositionalArguments(ASTPtr & argument, const ASTSelectQuery * select_query, ASTSelectQuery::Expression expression)

View File

@ -557,6 +557,8 @@ void KeeperTCPHandler::updateStats(Coordination::ZooKeeperResponsePtr & response
std::lock_guard lock(conn_stats_mutex);
conn_stats.updateLatency(elapsed);
}
operations.erase(response->xid);
keeper_dispatcher->updateKeeperStatLatency(elapsed);
last_op.set(std::make_unique<LastOp>(LastOp{

View File

@ -93,7 +93,7 @@ private:
Poco::Timestamp established;
using Operations = std::map<Coordination::XID, Poco::Timestamp>;
using Operations = std::unordered_map<Coordination::XID, Poco::Timestamp>;
Operations operations;
LastOpMultiVersion last_op;

View File

@ -198,7 +198,9 @@ std::vector<size_t> MergeTreeReadPool::fillPerPartInfo(const RangesInDataParts &
for (const auto i : collections::range(0, parts.size()))
{
const auto & part = parts[i];
is_part_on_remote_disk[i] = part.data_part->isStoredOnRemoteDisk();
bool part_on_remote_disk = part.data_part->isStoredOnRemoteDisk();
is_part_on_remote_disk[i] = part_on_remote_disk;
do_not_steal_tasks |= part_on_remote_disk;
/// Read marks for every data part.
size_t sum_marks = 0;

View File

@ -203,6 +203,8 @@ void MergeTreeReaderCompact::readData(
{
const auto & [name, type] = name_and_type;
adjustUpperBound(current_task_last_mark); /// Must go before seek.
if (!isContinuousReading(from_mark, column_position))
seekToMark(from_mark, column_position);
@ -211,8 +213,6 @@ void MergeTreeReaderCompact::readData(
if (only_offsets && (substream_path.size() != 1 || substream_path[0].type != ISerialization::Substream::ArraySizes))
return nullptr;
/// For asynchronous reading from remote fs.
data_buffer->setReadUntilPosition(marks_loader.getMark(current_task_last_mark).offset_in_compressed_file);
return data_buffer;
};
@ -275,6 +275,34 @@ void MergeTreeReaderCompact::seekToMark(size_t row_index, size_t column_index)
}
}
void MergeTreeReaderCompact::adjustUpperBound(size_t last_mark)
{
auto right_offset = marks_loader.getMark(last_mark).offset_in_compressed_file;
if (!right_offset)
{
/// If already reading till the end of file.
if (last_right_offset && *last_right_offset == 0)
return;
last_right_offset = 0; // Zero value means the end of file.
if (cached_buffer)
cached_buffer->setReadUntilEnd();
if (non_cached_buffer)
non_cached_buffer->setReadUntilEnd();
}
else
{
if (last_right_offset && right_offset <= last_right_offset.value())
return;
last_right_offset = right_offset;
if (cached_buffer)
cached_buffer->setReadUntilPosition(right_offset);
if (non_cached_buffer)
non_cached_buffer->setReadUntilPosition(right_offset);
}
}
bool MergeTreeReaderCompact::isContinuousReading(size_t mark, size_t column_position)
{
if (!last_read_granule)

View File

@ -52,6 +52,9 @@ private:
/// Should we read full column or only it's offsets
std::vector<bool> read_only_offsets;
/// For asynchronous reading from remote fs. Same meaning as in MergeTreeReaderStream.
std::optional<size_t> last_right_offset;
size_t next_mark = 0;
std::optional<std::pair<size_t, size_t>> last_read_granule;
@ -67,6 +70,9 @@ private:
MergeTreeMarksLoader & marks_loader,
const ColumnPositions & column_positions,
const MarkRanges & mark_ranges);
/// For asynchronous reading from remote fs.
void adjustUpperBound(size_t last_mark);
};
}

View File

@ -1,12 +1,21 @@
#!/usr/bin/env python3
from collections import namedtuple
import json
import time
import jwt
import jwt
import requests
import boto3
NEED_RERUN_OR_CANCELL_WORKFLOWS = {
13241696, # PR
15834118, # Docs
15522500, # MasterCI
15516108, # ReleaseCI
15797242, # BackportPR
}
# https://docs.github.com/en/rest/reference/actions#cancel-a-workflow-run
#
API_URL = 'https://api.github.com/repos/ClickHouse/ClickHouse'
@ -70,19 +79,32 @@ def _exec_get_with_retry(url):
raise Exception("Cannot execute GET request with retries")
def get_workflows_cancel_urls_for_pull_request(pull_request_event):
WorkflowDescription = namedtuple('WorkflowDescription',
['run_id', 'status', 'rerun_url', 'cancel_url'])
def get_workflows_description_for_pull_request(pull_request_event):
head_branch = pull_request_event['head']['ref']
print("PR", pull_request_event['number'], "has head ref", head_branch)
workflows = _exec_get_with_retry(API_URL + f"/actions/runs?branch={head_branch}")
workflows_urls_to_cancel = set([])
workflow_descriptions = []
for workflow in workflows['workflow_runs']:
if workflow['status'] != 'completed':
print("Workflow", workflow['url'], "not finished, going to be cancelled")
workflows_urls_to_cancel.add(workflow['cancel_url'])
else:
print("Workflow", workflow['url'], "already finished, will not try to cancel")
if workflow['workflow_id'] in NEED_RERUN_OR_CANCELL_WORKFLOWS:
workflow_descriptions.append(WorkflowDescription(
run_id=workflow['id'],
status=workflow['status'],
rerun_url=workflow['rerun_url'],
cancel_url=workflow['cancel_url']))
return workflows_urls_to_cancel
return workflow_descriptions
def get_workflow_description(workflow_id):
workflow = _exec_get_with_retry(API_URL + f"/actions/runs/{workflow_id}")
return WorkflowDescription(
run_id=workflow['id'],
status=workflow['status'],
rerun_url=workflow['rerun_url'],
cancel_url=workflow['cancel_url'])
def _exec_post_with_retry(url, token):
headers = {
@ -99,11 +121,11 @@ def _exec_post_with_retry(url, token):
raise Exception("Cannot execute POST request with retry")
def cancel_workflows(urls_to_cancel, token):
def exec_workflow_url(urls_to_cancel, token):
for url in urls_to_cancel:
print("Cancelling workflow using url", url)
print("Post for workflow workflow using url", url)
_exec_post_with_retry(url, token)
print("Workflow cancelled")
print("Workflow post finished")
def main(event):
token = get_token_from_aws()
@ -117,9 +139,39 @@ def main(event):
print("PR has labels", labels)
if action == 'closed' or 'do not test' in labels:
print("PR merged/closed or manually labeled 'do not test' will kill workflows")
workflows_to_cancel = get_workflows_cancel_urls_for_pull_request(pull_request)
print(f"Found {len(workflows_to_cancel)} workflows to cancel")
cancel_workflows(workflows_to_cancel, token)
workflow_descriptions = get_workflows_description_for_pull_request(pull_request)
urls_to_cancel = []
for workflow_description in workflow_descriptions:
if workflow_description.status != 'completed':
urls_to_cancel.append(workflow_description.cancel_url)
print(f"Found {len(urls_to_cancel)} workflows to cancel")
exec_workflow_url(urls_to_cancel, token)
elif action == 'labeled' and 'can be tested' in labels:
print("PR marked with can be tested label, rerun workflow")
workflow_descriptions = get_workflows_description_for_pull_request(pull_request)
if not workflow_descriptions:
print("Not found any workflows")
return
sorted_workflows = list(sorted(workflow_descriptions, key=lambda x: x.run_id))
most_recent_workflow = sorted_workflows[-1]
print("Latest workflow", most_recent_workflow)
if most_recent_workflow.status != 'completed':
print("Latest workflow is not completed, cancelling")
exec_workflow_url([most_recent_workflow.cancel_url], token)
print("Cancelled")
for _ in range(30):
latest_workflow_desc = get_workflow_description(most_recent_workflow.run_id)
print("Checking latest workflow", latest_workflow_desc)
if latest_workflow_desc.status in ('completed', 'cancelled'):
print("Finally latest workflow done, going to rerun")
exec_workflow_url([most_recent_workflow.rerun_url], token)
print("Rerun finished, exiting")
break
print("Still have strange status")
time.sleep(3)
else:
print("Nothing to do")

View File

@ -0,0 +1,78 @@
#!/usr/bin/env python3
import os
import subprocess
import logging
from github import Github
from env_helper import IMAGES_PATH, REPO_COPY
from stopwatch import Stopwatch
from upload_result_helper import upload_results
from s3_helper import S3Helper
from get_robot_token import get_best_robot_token
from pr_info import PRInfo
from commit_status_helper import post_commit_status
from docker_pull_helper import get_image_with_version
from tee_popen import TeePopen
NAME = "Woboq Build (actions)"
def get_run_command(repo_path, output_path, image):
cmd = "docker run " + \
f"--volume={repo_path}:/repo_folder " \
f"--volume={output_path}:/test_output " \
f"-e 'DATA=https://s3.amazonaws.com/clickhouse-test-reports/codebrowser/data' {image}"
return cmd
if __name__ == "__main__":
logging.basicConfig(level=logging.INFO)
stopwatch = Stopwatch()
temp_path = os.getenv("TEMP_PATH", os.path.abspath("."))
pr_info = PRInfo()
gh = Github(get_best_robot_token())
if not os.path.exists(temp_path):
os.makedirs(temp_path)
docker_image = get_image_with_version(IMAGES_PATH, 'clickhouse/codebrowser')
s3_helper = S3Helper('https://s3.amazonaws.com')
result_path = os.path.join(temp_path, "result_path")
if not os.path.exists(result_path):
os.makedirs(result_path)
run_command = get_run_command(REPO_COPY, result_path, docker_image)
logging.info("Going to run codebrowser: %s", run_command)
run_log_path = os.path.join(temp_path, "runlog.log")
with TeePopen(run_command, run_log_path) as process:
retcode = process.wait()
if retcode == 0:
logging.info("Run successfully")
else:
logging.info("Run failed")
subprocess.check_call(f"sudo chown -R ubuntu:ubuntu {temp_path}", shell=True)
report_path = os.path.join(result_path, "html_report")
logging.info("Report path %s", report_path)
s3_path_prefix = "codebrowser"
html_urls = s3_helper.fast_parallel_upload_dir(report_path, s3_path_prefix, 'clickhouse-test-reports')
index_html = '<a href="https://s3.amazonaws.com/clickhouse-test-reports/codebrowser/index.html">HTML report</a>'
test_results = [(index_html, "Look at the report")]
report_url = upload_results(s3_helper, pr_info.number, pr_info.sha, test_results, [], NAME)
print(f"::notice ::Report url: {report_url}")
post_commit_status(gh, pr_info.sha, NAME, "Report built", "success", report_url)

View File

@ -33,7 +33,7 @@ def get_pr_for_commit(sha, ref):
class PRInfo:
def __init__(self, github_event=None, need_orgs=False, need_changed_files=False):
def __init__(self, github_event=None, need_orgs=False, need_changed_files=False, labels_from_api=False):
if not github_event:
if GITHUB_EVENT_PATH:
with open(GITHUB_EVENT_PATH, 'r', encoding='utf-8') as event_file:
@ -61,7 +61,12 @@ class PRInfo:
self.head_ref = github_event['pull_request']['head']['ref']
self.head_name = github_event['pull_request']['head']['repo']['full_name']
self.labels = {l['name'] for l in github_event['pull_request']['labels']}
if labels_from_api:
response = requests.get(f"https://api.github.com/repos/{GITHUB_REPOSITORY}/issues/{self.number}/labels")
self.labels = {l['name'] for l in response.json()}
else:
self.labels = {l['name'] for l in github_event['pull_request']['labels']}
self.user_login = github_event['pull_request']['user']['login']
self.user_orgs = set([])
if need_orgs:
@ -90,7 +95,12 @@ class PRInfo:
f"https://api.github.com/repos/{GITHUB_REPOSITORY}/compare/{github_event['before']}...{self.sha}"
else:
self.number = pull_request['number']
self.labels = {l['name'] for l in pull_request['labels']}
if labels_from_api:
response = requests.get(f"https://api.github.com/repos/{GITHUB_REPOSITORY}/issues/{self.number}/labels")
self.labels = {l['name'] for l in response.json()}
else:
self.labels = {l['name'] for l in pull_request['labels']}
self.base_ref = pull_request['base']['ref']
self.base_name = pull_request['base']['repo']['full_name']
self.head_ref = pull_request['head']['ref']

View File

@ -90,6 +90,7 @@ def pr_is_by_trusted_user(pr_user_login, pr_user_orgs):
# can be skipped entirely.
def should_run_checks_for_pr(pr_info):
# Consider the labels and whether the user is trusted.
print("Got labels", pr_info.labels)
force_labels = set(['force tests']).intersection(pr_info.labels)
if force_labels:
return True, "Labeled '{}'".format(', '.join(force_labels))
@ -109,7 +110,7 @@ def should_run_checks_for_pr(pr_info):
if __name__ == "__main__":
logging.basicConfig(level=logging.INFO)
pr_info = PRInfo(need_orgs=True)
pr_info = PRInfo(need_orgs=True, labels_from_api=True)
can_run, description = should_run_checks_for_pr(pr_info)
gh = Github(get_best_robot_token())
commit = get_commit(gh, pr_info.sha)

View File

@ -4,6 +4,7 @@ import logging
import os
import re
import shutil
import time
from multiprocessing.dummy import Pool
import boto3
@ -83,6 +84,58 @@ class S3Helper:
else:
return S3Helper.copy_file_to_local(S3_BUILDS_BUCKET, file_path, s3_path)
def fast_parallel_upload_dir(self, dir_path, s3_dir_path, bucket_name):
all_files = []
for root, _, files in os.walk(dir_path):
for file in files:
all_files.append(os.path.join(root, file))
logging.info("Files found %s", len(all_files))
counter = 0
t = time.time()
sum_time = 0
def upload_task(file_path):
nonlocal counter
nonlocal t
nonlocal sum_time
try:
s3_path = file_path.replace(dir_path, s3_dir_path)
metadata = {}
if s3_path.endswith("html"):
metadata['ContentType'] = "text/html; charset=utf-8"
elif s3_path.endswith("css"):
metadata['ContentType'] = "text/css; charset=utf-8"
elif s3_path.endswith("js"):
metadata['ContentType'] = "text/javascript; charset=utf-8"
# Retry
for i in range(5):
try:
self.client.upload_file(file_path, bucket_name, s3_path, ExtraArgs=metadata)
break
except Exception as ex:
if i == 4:
raise ex
time.sleep(0.1 * i)
counter += 1
if counter % 1000 == 0:
sum_time += int(time.time() - t)
print("Uploaded", counter, "-", int(time.time() - t), "s", "sum time", sum_time, "s")
t = time.time()
except Exception as ex:
logging.critical("Failed to upload file, expcetion %s", ex)
return "https://s3.amazonaws.com/{bucket}/{path}".format(bucket=bucket_name, path=s3_path)
p = Pool(256)
logging.basicConfig(level=logging.CRITICAL)
result = sorted(_flatten_list(p.map(upload_task, all_files)))
logging.basicConfig(level=logging.INFO)
return result
def _upload_folder_to_s3(self, folder_path, s3_folder_path, bucket_name, keep_dirs_in_s3_path, upload_symlinks):
logging.info("Upload folder '%s' to bucket=%s of s3 folder '%s'", folder_path, bucket_name, s3_folder_path)
if not os.path.exists(folder_path):

View File

@ -456,3 +456,16 @@ def test_s3_disk_reads_on_unstable_connection(cluster, node_name):
for i in range(30):
print(f"Read sequence {i}")
assert node.query("SELECT sum(id) FROM s3_test").splitlines() == ["40499995500000"]
@pytest.mark.parametrize("node_name", ["node"])
def test_lazy_seek_optimization_for_async_read(cluster, node_name):
node = cluster.instances[node_name]
node.query("DROP TABLE IF EXISTS s3_test NO DELAY")
node.query("CREATE TABLE s3_test (key UInt32, value String) Engine=MergeTree() ORDER BY key SETTINGS storage_policy='s3';")
node.query("INSERT INTO s3_test SELECT * FROM generateRandom('key UInt32, value String') LIMIT 10000000")
node.query("SELECT * FROM s3_test WHERE value LIKE '%abc%' ORDER BY value LIMIT 10")
node.query("DROP TABLE IF EXISTS s3_test NO DELAY")
minio = cluster.minio_client
for obj in list(minio.list_objects(cluster.minio_bucket, 'data/')):
minio.remove_object(cluster.minio_bucket, obj.object_name)

View File

@ -37,7 +37,6 @@ def fail_request(cluster, request):
["curl", "-s", "http://resolver:8080/fail_request/{}".format(request)])
assert response == 'OK', 'Expected "OK", but got "{}"'.format(response)
def throttle_request(cluster, request):
response = cluster.exec_in_container(cluster.get_container_id('resolver'),
["curl", "-s", "http://resolver:8080/throttle_request/{}".format(request)])

View File

@ -7,6 +7,7 @@ import time
import pytest
from helpers.cluster import ClickHouseCluster, get_instances_dir
SCRIPT_DIR = os.path.dirname(os.path.realpath(__file__))
NOT_RESTORABLE_CONFIG_PATH = os.path.join(SCRIPT_DIR, './{}/node_not_restorable/configs/config.d/storage_conf_not_restorable.xml'.format(get_instances_dir()))
COMMON_CONFIGS = ["configs/config.d/bg_processing_pool_conf.xml", "configs/config.d/clusters.xml"]

View File

@ -36,7 +36,6 @@ def get_query_stat(instance, hint):
result[ev[0]] = int(ev[1])
return result
@pytest.mark.parametrize("min_rows_for_wide_part,read_requests", [(0, 2), (8192, 1)])
def test_write_is_cached(cluster, min_rows_for_wide_part, read_requests):
node = cluster.instances["node"]

View File

@ -65,7 +65,6 @@ def create_table(cluster, additional_settings=None):
list(cluster.instances.values())[0].query(create_table_statement)
@pytest.fixture(autouse=True)
def drop_table(cluster):
yield

View File

@ -67,8 +67,8 @@ def rabbitmq_cluster():
def rabbitmq_setup_teardown():
print("RabbitMQ is available - running test")
yield # run test
for table_name in ['view', 'consumer', 'rabbitmq']:
instance.query(f'DROP TABLE IF EXISTS test.{table_name}')
instance.query('DROP DATABASE test NO DELAY')
instance.query('CREATE DATABASE test')
# Tests

View File

@ -46,22 +46,6 @@ select x1, x2, x3 from test order by 3 limit 1 by 1;
100 100 1
10 1 10
1 10 100
explain syntax select x3, x2, x1 from test order by 1 + 1;
SELECT
x3,
x2,
x1
FROM test
ORDER BY x3 + x3 ASC
explain syntax select x3, x2, x1 from test order by (1 + 1) * 3;
SELECT
x3,
x2,
x1
FROM test
ORDER BY (x3 + x3) * x1 ASC
select x2, x1 from test group by x2 + x1; -- { serverError 215 }
select x2, x1 from test group by 1 + 2; -- { serverError 215 }
explain syntax select x3, x2, x1 from test order by 1;
SELECT
x3,
@ -110,27 +94,6 @@ GROUP BY
x2
select max(x1), x2 from test group by 1, 2; -- { serverError 43 }
select 1 + max(x1), x2 from test group by 1, 2; -- { serverError 43 }
select x1 + x2, x3 from test group by x1 + x2, x3;
11 100
200 1
11 200
11 10
select x3, x2, x1 from test order by x3 * 2, x2, x1; -- check x3 * 2 does not become x3 * x2
1 100 100
1 100 100
10 1 10
100 10 1
200 1 10
200 10 1
explain syntax select x1, x3 from test group by 1 + 2, 1, 2;
SELECT
x1,
x3
FROM test
GROUP BY
x1 + x3,
x1,
x3
explain syntax select x1 + x3, x3 from test group by 1, 2;
SELECT
x1 + x3,
@ -152,3 +115,5 @@ SELECT 1 + 1 AS a
GROUP BY a
select substr('aaaaaaaaaaaaaa', 8) as a group by a;
aaaaaaa
select substr('aaaaaaaaaaaaaa', 8) as a group by substr('aaaaaaaaaaaaaa', 8);
aaaaaaa

View File

@ -22,12 +22,6 @@ select x1, x2, x3 from test order by 3 limit 1 by 3;
select x1, x2, x3 from test order by x3 limit 1 by x1;
select x1, x2, x3 from test order by 3 limit 1 by 1;
explain syntax select x3, x2, x1 from test order by 1 + 1;
explain syntax select x3, x2, x1 from test order by (1 + 1) * 3;
select x2, x1 from test group by x2 + x1; -- { serverError 215 }
select x2, x1 from test group by 1 + 2; -- { serverError 215 }
explain syntax select x3, x2, x1 from test order by 1;
explain syntax select x3 + 1, x2, x1 from test order by 1;
explain syntax select x3, x3 - x2, x2, x1 from test order by 2;
@ -37,11 +31,7 @@ explain syntax select 1 + greatest(x1, 1), x2 from test group by 1, 2;
select max(x1), x2 from test group by 1, 2; -- { serverError 43 }
select 1 + max(x1), x2 from test group by 1, 2; -- { serverError 43 }
select x1 + x2, x3 from test group by x1 + x2, x3;
select x3, x2, x1 from test order by x3 * 2, x2, x1; -- check x3 * 2 does not become x3 * x2
explain syntax select x1, x3 from test group by 1 + 2, 1, 2;
explain syntax select x1 + x3, x3 from test group by 1, 2;
create table test2(x1 Int, x2 Int, x3 Int) engine=Memory;
@ -52,3 +42,5 @@ select a, b, c, d, e, f from (select 44 a, 88 b, 13 c, 14 d, 15 e, 16 f) t grou
explain syntax select plus(1, 1) as a group by a;
select substr('aaaaaaaaaaaaaa', 8) as a group by a;
select substr('aaaaaaaaaaaaaa', 8) as a group by substr('aaaaaaaaaaaaaa', 8);