Fix some tests

This commit is contained in:
Alexey Milovidov 2021-02-07 23:37:55 +03:00
parent 5b62b89752
commit 869bca74a7
6 changed files with 21 additions and 27 deletions

View File

@ -108,6 +108,7 @@ size_t CompressedReadBufferFromFile::readBig(char * to, size_t n)
/// If the decompressed block fits entirely where it needs to be copied. /// If the decompressed block fits entirely where it needs to be copied.
if (size_decompressed + additional_size_at_the_end_of_buffer <= n - bytes_read) if (size_decompressed + additional_size_at_the_end_of_buffer <= n - bytes_read)
{ {
//std::cerr << "readBig " << file_in.getFileName() << "\n";
decompress(to + bytes_read, size_decompressed, size_compressed_without_checksum); decompress(to + bytes_read, size_decompressed, size_compressed_without_checksum);
bytes_read += size_decompressed; bytes_read += size_decompressed;
bytes += size_decompressed; bytes += size_decompressed;

View File

@ -1,18 +1,23 @@
#include <boost/program_options.hpp> #include <boost/program_options.hpp>
#include <DataStreams/AsynchronousBlockInputStream.h> #include <DataStreams/IBlockOutputStream.h>
#include <DataTypes/DataTypeFactory.h> #include <DataTypes/DataTypeFactory.h>
#include <Storages/IStorage.h>
#include <Storages/ColumnsDescription.h>
#include <Storages/ConstraintsDescription.h>
#include <Interpreters/Context.h> #include <Interpreters/Context.h>
#include <IO/copyData.h> #include <Interpreters/DatabaseCatalog.h>
#include <IO/ReadBufferFromIStream.h> #include <IO/ReadBufferFromIStream.h>
#include <IO/ReadBufferFromFile.h> #include <IO/ReadBufferFromFile.h>
#include <IO/LimitReadBuffer.h> #include <IO/LimitReadBuffer.h>
#include <Storages/StorageMemory.h>
#include <Processors/Sources/SourceFromInputStream.h>
#include <Processors/Pipe.h> #include <Processors/Pipe.h>
#include <Processors/Sources/SinkToOutputStream.h> #include <Processors/Sources/SinkToOutputStream.h>
#include <Processors/Executors/PipelineExecutor.h> #include <Processors/Executors/PipelineExecutor.h>
#include <Processors/Formats/IInputFormat.h>
#include <Core/ExternalTable.h> #include <Core/ExternalTable.h>
#include <Poco/Net/MessageHeader.h> #include <Poco/Net/MessageHeader.h>
#include <Formats/FormatFactory.h>
#include <common/find_symbols.h> #include <common/find_symbols.h>
@ -29,17 +34,16 @@ ExternalTableDataPtr BaseExternalTable::getData(const Context & context)
{ {
initReadBuffer(); initReadBuffer();
initSampleBlock(); initSampleBlock();
auto input = context.getInputFormat(format, *read_buffer, sample_block, DEFAULT_BLOCK_SIZE); auto input = FormatFactory::instance().getInputFormat(format, *read_buffer, sample_block, context, DEFAULT_BLOCK_SIZE);
auto stream = std::make_shared<AsynchronousBlockInputStream>(input);
auto data = std::make_unique<ExternalTableData>(); auto data = std::make_unique<ExternalTableData>();
data->table_name = name; data->table_name = name;
data->pipe = std::make_unique<Pipe>(std::make_shared<SourceFromInputStream>(std::move(stream))); data->pipe = std::make_unique<Pipe>(std::move(input));
return data; return data;
} }
void BaseExternalTable::clean() void BaseExternalTable::clear()
{ {
name.clear(); name.clear();
file.clear(); file.clear();
@ -49,17 +53,6 @@ void BaseExternalTable::clean()
read_buffer.reset(); read_buffer.reset();
} }
/// Function for debugging information output
void BaseExternalTable::write()
{
std::cerr << "file " << file << std::endl;
std::cerr << "name " << name << std::endl;
std::cerr << "format " << format << std::endl;
std::cerr << "structure: \n";
for (const auto & elem : structure)
std::cerr << '\t' << elem.first << ' ' << elem.second << std::endl;
}
void BaseExternalTable::parseStructureFromStructureField(const std::string & argument) void BaseExternalTable::parseStructureFromStructureField(const std::string & argument)
{ {
std::vector<std::string> vals; std::vector<std::string> vals;
@ -182,7 +175,7 @@ void ExternalTablesHandler::handlePart(const Poco::Net::MessageHeader & header,
executor->execute(/*num_threads = */ 1); executor->execute(/*num_threads = */ 1);
/// We are ready to receive the next file, for this we clear all the information received /// We are ready to receive the next file, for this we clear all the information received
clean(); clear();
} }
} }

View File

@ -61,10 +61,7 @@ public:
protected: protected:
/// Clear all accumulated information /// Clear all accumulated information
void clean(); void clear();
/// Function for debugging information output
void write();
/// Construct the `structure` vector from the text field `structure` /// Construct the `structure` vector from the text field `structure`
virtual void parseStructureFromStructureField(const std::string & argument); virtual void parseStructureFromStructureField(const std::string & argument);

View File

@ -12,7 +12,6 @@
#include <Core/Block.h> #include <Core/Block.h>
#include <Core/NamesAndTypes.h> #include <Core/NamesAndTypes.h>
#include <Databases/IDatabase.h> #include <Databases/IDatabase.h>
#include <Storages/StorageMemory.h>
#include <IO/WriteHelpers.h> #include <IO/WriteHelpers.h>
#include <Interpreters/InDepthNodeVisitor.h> #include <Interpreters/InDepthNodeVisitor.h>
#include <Interpreters/IdentifierSemantic.h> #include <Interpreters/IdentifierSemantic.h>

View File

@ -22,7 +22,6 @@
#include <Interpreters/TablesStatus.h> #include <Interpreters/TablesStatus.h>
#include <Interpreters/InternalTextLogsQueue.h> #include <Interpreters/InternalTextLogsQueue.h>
#include <Interpreters/OpenTelemetrySpanLog.h> #include <Interpreters/OpenTelemetrySpanLog.h>
#include <Storages/StorageMemory.h>
#include <Storages/StorageReplicatedMergeTree.h> #include <Storages/StorageReplicatedMergeTree.h>
#include <Storages/MergeTree/MergeTreeDataPartUUID.h> #include <Storages/MergeTree/MergeTreeDataPartUUID.h>
#include <Core/ExternalTable.h> #include <Core/ExternalTable.h>
@ -214,6 +213,9 @@ void TCPHandler::runImpl()
/// Get blocks of temporary tables /// Get blocks of temporary tables
readData(connection_settings); readData(connection_settings);
if (state.io.out)
state.io.out->writeSuffix();
/// Reset the input stream, as we received an empty block while receiving external table data. /// Reset the input stream, as we received an empty block while receiving external table data.
/// So, the stream has been marked as cancelled and we can't read from it anymore. /// So, the stream has been marked as cancelled and we can't read from it anymore.
state.block_in.reset(); state.block_in.reset();

View File

@ -1,7 +1,9 @@
SET max_insert_threads = 1, max_threads = 100, min_insert_block_size_rows = 1048576, max_block_size = 65536; SET max_insert_threads = 1, max_threads = 100, min_insert_block_size_rows = 1048576, max_block_size = 65536;
CREATE TEMPORARY TABLE t (x UInt64); DROP TABLE IF EXISTS t;
CREATE TABLE t (x UInt64) ENGINE = StripeLog;
-- For trivial INSERT SELECT, max_threads is lowered to max_insert_threads and max_block_size is changed to min_insert_block_size_rows. -- For trivial INSERT SELECT, max_threads is lowered to max_insert_threads and max_block_size is changed to min_insert_block_size_rows.
INSERT INTO t SELECT * FROM numbers_mt(1000000); INSERT INTO t SELECT * FROM numbers_mt(1000000);
SET max_threads = 1; SET max_threads = 1;
-- If data was inserted by more threads, we will probably see data out of order. -- If data was inserted by more threads, we will probably see data out of order.
SELECT DISTINCT blockSize(), runningDifference(x) FROM t; SELECT DISTINCT blockSize(), runningDifference(x) FROM t;
DROP TABLE t;