c++11 boost - 线程thread

2018-07-14 09:25:18

thread 库实现了操作系统线程,多线程可以让我们很好的利用现代的多核cpu。

线程在特定的中断点是可以被中断执行的,会抛出 thread_interrupted 异常,默认动作是终止。

thread 库的中断点共12个,它们都是函数:

thread::join()
thread::try_join_for()/try_join_until()
condition_variable::wait()
condition_variable::wait_for()/wait_until()
condition_variable_any::wait()
condition_variable_any::wait_for()/wait_until()
this_thread::sleep_for()/sleep_until()
this_thread::interruption_point()

// Copyright (c) 2015
// Author: Chrono Law
#include <iostream>
using namespace std;

//#define BOOST_THREAD_VERSION 4
#include <boost/thread.hpp>
using namespace boost;


#include <boost/chrono.hpp>
using namespace boost::chrono;
seconds operator"" _s(unsigned long long n)
{
    return seconds(n);
}

milliseconds operator"" _ms(unsigned long long n)
{
    return milliseconds(n);
}

void case1()
{
    {
        thread t1;  //空线程对象
        assert(!t1.joinable());

        thread t2([]{cout << "a thread" << endl;});
        assert(t2.joinable());

    }

    thread t1,t2;
    cout << t1.get_id() << endl;    //获取线程id对象
    assert(t1.get_id() == t2.get_id());

	//输出可并行的线程数量
    cout << thread::hardware_concurrency() << endl;
    cout << thread::physical_concurrency() << endl;

}

#include <boost/bind.hpp>
void dummy(int n)
{
    for(int i = 0;i < n; ++i);
    cout << n << endl;
}

void case2()
{
    //thread t1(dummy, 100);
    //thread t2(dummy, 500);

    thread t1(bind(dummy, 100));
    thread t2([]{dummy(500);});

    //this_thread::sleep_for(200_ms);
    t1.try_join_for(100_ms);  //等待线程终止,超时时间为100ms
    t2.join();
}

void case3()
{
    thread t1(dummy, 100);
    t1.detach();
    assert(!t1.joinable());

    thread(dummy, 1000).detach();  //分离,那么主线程就不能再控制次线程,也不需要join。
    this_thread::sleep_for(200_ms);  //睡眠
}

//////////////////////////////////////////
#include <boost/thread/thread_guard.hpp>
#include <boost/thread/scoped_thread.hpp>
void case4()
{
    thread t1(dummy, 200);
    thread t2(dummy, 300);

    thread_guard<detach> g1(t1);  //析构后线程继续执行
    thread_guard<>       g2(t2);  //析构后等待线程结束

    {
    	//退出作用域就析构
        scoped_thread<detach> t1(dummy, 10); //析构后线程继续运行
        scoped_thread<>       t2(dummy, 20); //析构后等待线程结束
    }

    this_thread::sleep_for(100_ms);
}

//线程中断实例,线程不是在任意点都会被中断的,必须设置中断点
void to_interrupt(int x)
try
{
    for (int i = 0;i < x; ++i)
    {
        //this_thread::sleep_for(400_ms);
        cout << i << endl;
        this_thread::interruption_point();  //设置中断点
    }
}
catch(const thread_interrupted& )
{
    cout << "thread_interrupted" << endl;
}

void case5()
{
    thread t(to_interrupt,10);
    //this_thread::sleep_for(1_s);

    t.interrupt();
    assert(t.interruption_requested());

    t.join();
}

//////////////////////////////////////////
void to_interrupt2(int x)
try
{
    using namespace this_thread;
    assert(interruption_enabled());

    for (int i = 0;i < x; ++i)
    {
        disable_interruption di;  //关闭中断
        assert(!interruption_enabled());
        cout << i << endl;
        cout << this_thread::interruption_requested() << endl;
        this_thread::interruption_point();

        restore_interruption ri(di);  //临时恢复中断
        assert(interruption_enabled());
        cout << "can interrupted" << endl;
        cout << this_thread::interruption_requested() << endl;  //是否要求被中断
        this_thread::interruption_point();
    }
    //在这里恢复可线程可中断状态

    assert(interruption_enabled());
}
catch(const thread_interrupted& )
{
    cout << "[thread_interrupted]" << endl;
}

void case6()
{
    thread t(to_interrupt2,10);
    //this_thread::sleep_for(1_s);

    t.interrupt();
    assert(t.interruption_requested());

    t.join();
}
//线程组
void case7()
{
    thread_group tg;
    tg.create_thread(bind(dummy, 100));
    tg.create_thread(bind(dummy, 200));
    tg.join_all();
}
//call_once 可以保证多个线程只有一个线程能调用成功。
int g_count;
void init_count(int x)
{
    cout << "should call once." << endl;
    g_count = x;
}

void call_func()
{
    static once_flag once;
    call_once(once, init_count, 10);
}

void case8()
{
    (scoped_thread<>(call_func));
    (scoped_thread<>(call_func));
}

int main()
{
    case1();
    case2();
    case3();
    case4();
    case5();
    case6();
    case7();
    case8();
}
[root@192 c++]# g++ -std=c++11 -L/usr/local/lib/boost_1_65_1/ -lboost_thread -lboost_system -lboost_chrono -lpthread  main.cpp
[root@192 c++]# ./a.out 
{Not-any-thread}
a thread
2
2
100
500
1000
100
300
20
200
10
0
thread_interrupted
0
1
can interrupted
1
[thread_interrupted]
100
200
should call once.


例子-2

// Copyright (c) 2015
// Author: Chrono Law
#include <iostream>
#include <stack>
using namespace std;

//#define BOOST_THREAD_PROVIDES_VARIADIC_THREAD
//#define BOOST_THREAD_PROVIDES_FUTURE
#define BOOST_THREAD_VERSION 5
#include <boost/bind.hpp>
#include <boost/atomic.hpp>
#include <boost/thread.hpp>
#include <boost/thread/lock_factories.hpp>
using namespace boost;

#include <boost/chrono.hpp>
using namespace boost::chrono;
seconds operator"" _s(unsigned long long n)
{
    return seconds(n);
}

milliseconds operator"" _ms(unsigned long long n)
{
    return milliseconds(n);
}

//条件变量例子,生产者,消费者
class buffer
{
    private:
        mutex mu;
        condition_variable_any cond_put;
        condition_variable_any cond_get;

        stack<int> stk;
        int un_read,capacity;

        bool is_full()
        {   return un_read == capacity; }

        bool is_empty()
        {   return un_read == 0 ;   }
    public:
        buffer(size_t n):un_read(0),capacity(n){}

        void put(int x)
        {
            {
                auto lock = make_unique_lock(mu);
                cond_put.wait(lock,
                    [this]{return un_read < capacity;});
                //for(;is_full();)
                //{
                //    cout << "full waiting... "  << endl;
                //    cond_put.wait(lock);
                //}
                stk.push(x);
                ++un_read;
            }
            cond_get.notify_one();
        }

        void get(int *x)
        {
            {
                auto lock = make_unique_lock(mu);
                cond_get.wait(lock,
                    [this]{return un_read > 0;});
                //for(;is_empty();)
                //{
                //    cout << "empty waiting... " << endl;
                //    cond_get.wait(lock);
                //}
                --un_read;
                *x = stk.top();
                stk.pop();
            }
            cond_put.notify_one();
        }
};

buffer buf(5);

void producer(int n)
{
    for (int i = 0;i < n; ++i)
    {
        cout << "put " << i << endl;
        buf.put(i);
    }
}

void consumer( int n)
{
    int x;
    for (int i = 0;i < n; ++i)
    {
        buf.get(&x);
        cout << "get " << x << endl;
    }
}


void case1()
{
    thread_group tg;

    tg.create_thread(bind(producer, 20));
    tg.create_thread(bind(consumer, 10));
    tg.create_thread(bind(consumer, 10));

    tg.join_all();
}

//future 提供了异步操作线程的返回值
//#include <future>
void dummy(int n)
{
    for(int i = 0;i < n; ++i);
    cout << n << endl;
}
void case2()
{
    //auto x = async(&dummy, 10);
    //x.wait();

    boost::async(bind(dummy, 10));  

    auto f = boost::async([]{cout << "hello" << endl;});
    f.wait();  //等待线程执行完毕

    async(launch::async, dummy, 100);  //立即执行
}

//////////////////////////////////////////
int fab(int n)
{
    if (n == 0 || n == 1)
    {   return 1;   }
    return fab(n-1) + fab(n-2);
}

void case3()
{
    auto f5 = async(fab, 5);  //fab(5),执行实际不确定
    auto f7 = async(launch::async, fab, 7);  //立即执行fab(5)

    cout << f5.get() + f7.get() << endl;  //阻塞直到线程执行完毕,后去返回值
    assert(!f5.valid() && !f7.valid());

    auto f10 = async(fab, 10);
    auto s = f10.wait_for(100_ms);  //等待100ms并返回计算结果

    if(f10.valid())  //已经计算完成
    {
        assert(s == future_status::ready);
        cout << f10.get() << endl;
    }

    vector<boost::future<int>> vec;
    for(int i = 0;i < 5; ++i)
    {
        vec.push_back(async(fab, i + 10));
    }

    wait_for_any(vec[3], vec[4], vec[2]);  //等待任意的计算结果并返回
    for(auto& x : vec)
    {
        if(x.valid())
        {   cout << x.get() << endl;        }
    }

    //wait_for_all(vec.begin(), vec.end());  //等待所有计算结束

    //for(auto& x : vec)
    //{
    //    cout << x.get() << ',';
    //}
    cout << endl;
}

//可以多次调用 get() 后去结果
void case4()
{
    //shared_future<int> f5 = async(fab, 5);
    auto f5 = async(fab, 5).share();
    //cout << f5.get() << endl;

    auto func = [](decltype(f5) f){
        cout << "[" << f.get() << "]";
        };

    async(func, f5);
    async(func, f5);

    this_thread::sleep_for(100_ms);

    assert(f5.valid());
}

void case5()
{
    auto func = [](int n, promise<int>& p){
        p.set_value(fab(n));  //设置要返回的值
    };

    promise<int> p;

    thread(func, 10, boost::ref(p)).detach();

    auto f = p.get_future();  //产生future对象
    cout << f.get() << endl;

}

void case6()
{
    boost::atomic<int> x;
    barrier br(5);

    auto func = [&](){
        cout << "thread"<< ++x <<" arrived barrier." << endl;
        br.wait();  //必须5个线程都到达此处才能继续执行
        cout << "thread run."  << endl;
    };

    thread_group tg;
    for (int i = 0;i < 5;++i)  //创建5个线程
    {
        tg.create_thread(func);
    }
    tg.join_all();
}

//线程本地存储
//每个线程都有一份自己的拷贝。
void case7()
{
    thread_specific_ptr<int> pi;  //线程本地存储一个整数

    auto func = [&]{
        pi.reset(new int()); //函数赋值

        ++(*pi);
        cout << "thread v=" << *pi <<  endl;
    };
    async(func);
    async(func);

    this_thread::sleep_for(100_ms);
}

int main()
{
    cout << "thread v=" << BOOST_THREAD_VERSION << endl;
    //case1();
    case2();
    case3();
    case4();
    case5();
    case6();
    case7();
    //case8();
}
[root@192 c++]# g++ -std=c++11 -L/usr/local/lib/boost_1_65_1/ -lboost_thread -lboost_system -lboost_chrono -lpthread  main.cpp
[root@192 c++]# ./a.out 
thread v=4
hello
10
100
29
89
89
144
233
377
610

[8][8]89
thread-999923791 arrived barrier.
thread-999923790 arrived barrier.
thread-999923789 arrived barrier.
thread-999923788 arrived barrier.
thread-999923787 arrived barrier.
thread run.
thread run.
thread run.
thread run.
thread run.
thread v=1
thread v=1


 备注

1.编译器版本gcc4.8.5,运行环境centos7 64位
2.本文只做简单记录用,详细用法请参考 Boost Library,或者是罗剑锋的 boost程序库完全开发指南 书本
3..原文地址http://www.freecls.com/a/2712/b5


©著作权归作者所有
收藏
推荐阅读
简介
天降大任于斯人也,必先苦其心志。