oneTBB 并行编程框架

(8 mins to read)

安装

1
git clone --depth 1 https://github.com/oneapi-src/oneTBB.git
1
2
3
4
mkdir build && cd build
cmake ..
make -j 4
make install

make过程中可能出现如下问题

1
2
c++: fatal error: Killed signal terminated program cc1plus
compilation terminated.

原因就是内存太小了,减小并行度或者临时换页 (参考

计时

1
2
3
4
5
6
7
8
tbb::tick_count::now();
tick_count t0 = tick_count::now();
tick_count t1 = tick_count::now();
printf(“work took %g seconds\n”,(t1-t0).seconds());

auto start = std::chrono::steady_clock::now();
auto end = std::chrono::steady_clock::now();
std::cerr << std::chrono::duration<double, std::milli>(end - start).count() << '\n';

只记时一次,容易出现波动

使用google benchmark,可以跑多次求平均值

控制运行的线程数

Thread local storage:thread_local表明该变量的作用域是线程,每个线程都有一份拷贝。

对于TBB而言,你不知道任务和线程之间的对应关系。

enumerable_thread_specific:provides thread local storage that acts like anSTL container with one element per thread. The container permits iteratingover the elements using the usual STL iteration idioms. Any thread can iterateover all the local copies, seeing the other threads localdata.(每个线程有一份拷贝,并且所有拷贝被组织成一个容器,可以顺序遍历容器来结合最终结果)

1
2
3
4
5
6
7
8
9
using ets_vector_t = enumerable_thread_specific<vector<int>>;
ets_vector_t partial(N);
parallel_for(0, 10, [&](int i) {
ets_vector_t::reference local = partial.local();
for (int& x : local) x = 20;
});
vector<int> sum(N);
for (auto& it : partial)
for (int j = 0; j < N; ++j) sum[j] += it[j];

用法就是包装一个tbb::enumerable_thread_specific,然后在内部使用local得到自己的那份拷贝,此外可以遍历整个容器,得到每一份拷贝。(如果该线程第一次调用local,就会创建一份,否则会使用自己已经创建的那份)

可以使用size()得到有几个拷贝,但是该容器只支持顺序遍历,不能随机访问(不支持[])。所以如果指定线程想查看对方的私有拷贝,这是不行的,你既不知道它的下标,也没办法直接访问

combinable:provides thread local storage for holding per-threadsubcomputations that will later be reduced to a single result. Each thread canonly see its local data or, after calling combine, the combined data.(用于存储中间结果,只能看到自己的数据)

1
2
3
4
5
6
7
8
combinable<vector<int>> partial{[](){return vector<int>(N); }};
parallel_for(0, 10, [&](int i) {
auto& local = partial.local();
});
vector<int> sum(N);
partial.combine_each([&](const vector<int>& a) {
for (int i = 0; i < N; ++i) sum[i] += a[i];
});

combinable需要传入一个lambda,在local中会调用,用于初始化。

通过combine_each将每份拷贝的结果合并起来。

note:个人感觉这两者差不多,combine_each其实就等价于顺序遍历拷贝构成的容器


Task

parallel_invoke(f1,f2...)(隐式的task spawning和barrier)

1
2
3
4
5
task_group g;
g.run(f1);
g.run(f2);
...
g.wait();

run_and_wait(f):we avoid the overhead of enqueueing-schedulingdequeuingsteps, and second, we avoid the potential stealing that can happen while thetask is in the queue.

cancel()

如果有大量任务,线性spawn不如递归spawn(满二叉的形式)


1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
parallel_for(0, n, [](int i) {

});
parallel_for(blocked_range<int>(0, n), [&](const auto& range r) {
for (int i = r.begin(); i != r.end(); ++i) {

}
});
parallel_reduce(blocked_range<int>(0, n), init, // 规约
[&](const auto& range r, auto pre) {

},
[&](auto x, auto y) { // combine

};
);
parallel_scan(blocked_range<int>(0, n), init, // 前缀规约
[&](const auto& range r, auto pre, bool is_final_scan) {
if (is_final_scan) {
// write
}
},
[&](auto x, auto y) { // combine

};
);

并行过滤:考虑对一个vector执行一个过滤条件。我目前认为最好的写法是先resize足够大的空间,然后通过一个atomic记录结果vector的大小

1
2
size_t local_base = res_size.fetch_add(local_size);
for (size_t i = 0; i < local_size; i++) res[local_base + i] = local_res[i];

POD_vector(?):上述情况中必须先resize,不能reserve,而resize的一个问题是会对所有元素做一遍memset,有时候这是很多余的,所以可以通过一些trick避免这个初始化的过程。