From cb25fafb43fe3b1b1508dd7e6302481f402e9c7c Mon Sep 17 00:00:00 2001 From: Dmitrii Makarenko Date: Mon, 16 Oct 2023 08:01:48 -0700 Subject: [PATCH] [Async] Change std::async to tbb This commit resolves segfault in H20 benchmark on 1e9 data. Currently a system_error occurs during checksum calculation. According to cppref: https://en.cppreference.com/w/cpp/thread/async. System error is thrown if there are not enough resources to create a new thread: ``` std::system_error with error condition std::errc::resource_unavailable_try_again, if policy == std::launch::async and the implementation is unable to start a new thread. If policy is std::launch::async | std::launch::deferred or has additional bits set, it will fall back to deferred invocation or the implementation-defined policies in this case. ``` To avoid this situation, I rewrote this method to use tbb, which should automatically check for available threads in the thread pool. Signed-off-by: Dmitrii Makarenko --- .../ResultSetRegistry/ColumnarResults.cpp | 80 +++++++++---------- 1 file changed, 37 insertions(+), 43 deletions(-) diff --git a/omniscidb/ResultSetRegistry/ColumnarResults.cpp b/omniscidb/ResultSetRegistry/ColumnarResults.cpp index a22924e85..6df9a91f2 100644 --- a/omniscidb/ResultSetRegistry/ColumnarResults.cpp +++ b/omniscidb/ResultSetRegistry/ColumnarResults.cpp @@ -623,14 +623,15 @@ void ColumnarResults::materializeAllLazyColumns( const ResultSet& rows, const size_t num_columns) { CHECK(isDirectColumnarConversionPossible()); - const auto do_work_just_lazy_columns = [num_columns, &rows, this]( - const size_t row_idx, - const std::vector& targets_to_skip) { + const auto do_write_only_lazy_columns = [num_columns, &rows, this]( + const size_t row_idx, + const std::vector& targets_to_skip) { const auto crt_row = rows.getRowAtNoTranslations(row_idx, targets_to_skip); for (size_t i = 0; i < num_columns; ++i) { - if (!targets_to_skip.empty() && !targets_to_skip[i]) { - writeBackCell(crt_row[i], row_idx - rows.getOffset(), i); + if (targets_to_skip.empty() || targets_to_skip[i]) { + continue; } + writeBackCell(crt_row[i], row_idx - rows.getOffset(), i); } }; @@ -639,47 +640,40 @@ void ColumnarResults::materializeAllLazyColumns( bool has_array = std::any_of(target_types_.begin(), target_types_.end(), [](const hdk::ir::Type* type) { return type->isArray(); }); - if (rows.areAnyColumnsLazyFetched() || !offset_buffers_.empty() || has_array) { - const size_t worker_count = - result_set::use_parallel_algorithms(rows) ? cpu_threads() : 1; - std::vector> conversion_threads; - std::vector targets_to_skip; - if (skip_non_lazy_columns) { - CHECK_EQ(lazy_fetch_info.size(), size_t(num_columns)); - targets_to_skip.reserve(num_columns); - for (size_t i = 0; i < num_columns; i++) { - // we process lazy and varlen columns (i.e., skip non-lazy and non-varlen columns) - targets_to_skip.push_back( - (lazy_fetch_info.empty() || !lazy_fetch_info[i].is_lazily_fetched) && - !target_types_[i]->isVarLen() && !target_types_[i]->isArray()); - } - } - size_t first = rows.getOffset(); - size_t last = rows.entryCount(); - if (rows.isTruncated()) { - last = std::min(last, first + rows.getLimit()); - } - for (auto interval : makeIntervals(first, last, worker_count)) { - conversion_threads.push_back(std::async( - std::launch::async, - [&do_work_just_lazy_columns, &targets_to_skip, first, this](const size_t start, - const size_t end) { - for (size_t i = start; i < end; ++i) { - do_work_just_lazy_columns(i, targets_to_skip); - } - }, - interval.begin, - interval.end)); - } + if (!rows.areAnyColumnsLazyFetched() && offset_buffers_.empty() && !has_array) { + return; + } - try { - for (auto& child : conversion_threads) { - child.wait(); - } - } catch (...) { - throw; + std::vector targets_to_skip{}; + if (skip_non_lazy_columns) { + CHECK_EQ(lazy_fetch_info.size(), size_t(num_columns)); + targets_to_skip.reserve(num_columns); + for (size_t i = 0; i < num_columns; i++) { + // we process lazy and varlen columns (i.e., skip non-lazy and non-varlen columns) + bool skip_column = + (lazy_fetch_info.empty() || !lazy_fetch_info[i].is_lazily_fetched) && + !target_types_[i]->isVarLen() && !target_types_[i]->isArray(); + targets_to_skip.push_back(skip_column); } } + size_t first_row = rows.getOffset(); + size_t last_row = rows.entryCount(); + if (rows.isTruncated()) { + last_row = std::min(last_row, first_row + rows.getLimit()); + } + const size_t worker_count = + result_set::use_parallel_algorithms(rows) ? cpu_threads() : 1; + // Heuristics, should be tuned somehow + size_t granularity = (last_row - first_row) / (worker_count * 3); + granularity = std::max(granularity, static_cast(10)); + + tbb::parallel_for(tbb::blocked_range(first_row, last_row, granularity), + [&do_write_only_lazy_columns, targets_to_skip, first_row, this]( + const tbb::blocked_range& interval) { + for (size_t i = interval.begin(); i < interval.end(); ++i) { + do_write_only_lazy_columns(i, targets_to_skip); + } + }); } /**