linux c线程(2)-同步和线程取消

2018-06-13 13:07:00

本篇将介绍线程用来同步彼此行为的两个工具:互斥量和条件变量。

我们先来看一个例子。

#include <sys/types.h>
#include <stdio.h>
#include <stdlib.h>
#include <unistd.h>
#include <errno.h>
#include <string.h>
#include <fcntl.h>
#include <pthread.h>

static volatile int glob = 0;

static void * threadFunc(void *arg){
    int loops = *((int *) arg);
    int loc, j;

    for (j = 0; j < loops; j++) {
        loc = glob;
        loc++;
        glob = loc;
    }

    return NULL;
}

int main(int argc, char *argv[]){
    pthread_t t1, t2;
    int loops, s;

    loops = (argc > 1) ? atoi(argv[1]) : 10000000;

    s = pthread_create(&t1, NULL, threadFunc, &loops);
    if (s != 0)
        perror("pthread_create");
    s = pthread_create(&t2, NULL, threadFunc, &loops);
    if (s != 0)
        perror("pthread_create");

    s = pthread_join(t1, NULL);
    if (s != 0)
        perror("pthread_join");
    s = pthread_join(t2, NULL);
    if (s != 0)
        perror("pthread_join");

    printf("glob = %d\n", glob);
    exit(EXIT_SUCCESS);
}
[root@izj6cfw9yi1iqoik31tqbgz c]# gcc main.c -lpthread
[root@izj6cfw9yi1iqoik31tqbgz c]# ./a.out 10000
glob = 20000
[root@izj6cfw9yi1iqoik31tqbgz c]# ./a.out 100000
glob = 200000
[root@izj6cfw9yi1iqoik31tqbgz c]# ./a.out 1000000
glob = 2000000
[root@izj6cfw9yi1iqoik31tqbgz c]# ./a.out 10000000
glob = 11914942

从结果可以看出,一旦把累加数调大,结果就不是我们预期的了。那是因为当累加数少时,cpu还没切换,一个线程就已经完成了所有的累加,那就没有所谓的竞争了。不多解释,给读者结合下图自己思考。


静态分配的互斥量

互斥量既可以像静态变量那样分配,也可以在运行时动态创建。动态互斥量创建稍微有点复杂,这里只讲解静态分配的。互斥量属于pthread_mutex_t类型的变量。使用前必须初始化。

加锁和解锁互斥量

#include <pthread.h>

int pthread_mutex_lock(pthread_mutex_t *mutex);
int pthread_mutex_unlock(pthread_mutex_t *mutex);

//Both return 0 on success, or a positive error number on error

如果互斥量当前处于未锁定状态,该调用将锁定互斥量并立即返回。如果其他线程已经锁定了,则阻塞,直至该互斥量被解锁。在Linux上如果互斥量已经被自己先前锁定,那么将产生死锁。pthread_mutex_unlock()对于未锁定的状态的互斥量进行解锁,或者解锁其他线程锁定的互斥量都将会出错。

int pthread_mutex_trylock(pthread_mutex_t *mutex);

//return 0 on success

非阻塞 pthread_mutex_trylock() 对已经被锁定的将失败并返回EBUSY错误。其他行为和pthread_mutex_lock()一样。

例子

#include <sys/types.h>
#include <stdio.h>
#include <stdlib.h>
#include <unistd.h>
#include <errno.h>
#include <string.h>
#include <fcntl.h>
#include <pthread.h>

static volatile int glob = 0;
static pthread_mutex_t mtx = PTHREAD_MUTEX_INITIALIZER;

static void * threadFunc(void *arg){
    int loops = *((int *) arg);
    int loc, j, s;

    for (j = 0; j < loops; j++) {
        s = pthread_mutex_lock(&mtx);
        if (s != 0)
            perror("pthread_mutex_lock");

        loc = glob;
        loc++;
        glob = loc;

        s = pthread_mutex_unlock(&mtx);
        if (s != 0)
            perror("pthread_mutex_unlock");
    }

    return NULL;
}

int main(int argc, char *argv[]){
    pthread_t t1, t2;
    int loops, s;

    loops = (argc > 1) ? atoi(argv[1]) : 10000000;

    s = pthread_create(&t1, NULL, threadFunc, &loops);
    if (s != 0)
        perror("pthread_create");
    s = pthread_create(&t2, NULL, threadFunc, &loops);
    if (s != 0)
        perror("pthread_create");

    s = pthread_join(t1, NULL);
    if (s != 0)
        perror("pthread_join");
    s = pthread_join(t2, NULL);
    if (s != 0)
        perror("pthread_join");

    printf("glob = %d\n", glob);
    exit(EXIT_SUCCESS);
}
[root@izj6cfw9yi1iqoik31tqbgz c]# ./a.out 10000000
glob = 20000000

利用互斥量会消耗一定的性能为代价,不过通常情况下,线程会花费更多时间去做其他工作,对互斥量的加锁和解锁操作相对要少的多,因此使用互斥量对于大部分程序的性能影响不大。

互斥量死锁

有时候,一个线程需要同时访问两个或以上更多不同的共享资源,而每个资源又都由不同的互斥量管理。当超过一个线程加锁同一组互斥量时,就有可能发生死锁。如下图,其中线程A成功的锁住mutex1,而线程B成功锁住mutex2,接着试图对已为另一线程锁定的互斥量加锁,那么两个线程将无限期的等待下去。


通知状态的改变:条件变量

互斥量防止多个线程访问同一共享变量。条件变量允许一个线程就某个共享变量的状态变化通知其他线程,并让其他线程等待这一通知。

跟互斥量一样,条件变量的分配有静态和动态之分。静态分配初始化如下

pthread_cond_t cond = PTHREAD_COND_INITIALIZER;
#include <pthread.h>

int pthread_cond_signal(pthread_cond_t *cond);    //单个
int pthread_cond_broadcast(pthread_cond_t *cond);  //全部

//调用前加锁
//调用时内部会自动解锁并等待信号
//返回时内部又会自动加锁
int pthread_cond_wait(pthread_cond_t *cond, pthread_mutex_t *mutex);

//All return 0 on success, or a positive error number on error
//支持只阻塞一定时间,如果在指定时间内无相关条件变量通知
//则失败并返回ETIMEOUT错误
#include <pthread.h>

int pthread_cond_timedwait(pthread_cond_t *cond, pthread_mutex_t *mutex,
const struct timespec *abstime);

//Returns 0 on success, or a positive error number on error

例子

#include <sys/types.h>
#include <stdio.h>
#include <stdlib.h>
#include <unistd.h>
#include <errno.h>
#include <string.h>
#include <fcntl.h>
#include <pthread.h>

static pthread_mutex_t mtx = PTHREAD_MUTEX_INITIALIZER;
static pthread_cond_t cond = PTHREAD_COND_INITIALIZER;
typedef enum { FALSE, TRUE } Boolean;
static int avail = 0;

static void * threadFunc(void *arg){
    int cnt = atoi((char *) arg);
    int s, j;

    for (j = 0; j < cnt; j++) {
        sleep(1);

        /* Code to produce a unit omitted */

        s = pthread_mutex_lock(&mtx);
        if (s != 0)
            perror("pthread_mutex_lock");

        avail++;        /* Let consumer know another unit is available */

        s = pthread_mutex_unlock(&mtx);
        if (s != 0)
            perror("pthread_mutex_unlock");

        s = pthread_cond_signal(&cond);         /* Wake sleeping consumer */
        if (s != 0)
            perror("pthread_cond_signal");
    }

    return NULL;
}

int main(int argc, char *argv[]){
    pthread_t tid;
    int s, j;
    int totRequired;  //所有线程生产的总数
    int numConsumed;  //目前消费的总数
    Boolean done;
    time_t t;

    t = time(NULL);

    /* Create all threads */

    totRequired = 0;
    for (j = 1; j < argc; j++) {
        totRequired += atoi(argv[j]);

        s = pthread_create(&tid, NULL, threadFunc, argv[j]);
        if (s != 0)
            perror("pthread_create");
    }

    /* Loop to consume available units */

    numConsumed = 0;
    done = FALSE;

    for (;;) {
        s = pthread_mutex_lock(&mtx);
        if (s != 0)
            perror("pthread_mutex_lock");

        //等待
        while (avail == 0) {
            //解锁并等待信号
            //返回时又自动加锁
            s = pthread_cond_wait(&cond, &mtx);
            if (s != 0)
                perror("pthread_cond_wait");
        }

        /* At this point, 'mtx' is locked... */

        while (avail > 0) {             /* Consume all available units */

            /* Do something with produced unit */

            numConsumed ++;
            avail--;
            printf("T=%ld: numConsumed=%d\n", (long) (time(NULL) - t),
                    numConsumed);

            done = numConsumed >= totRequired;
        }

        s = pthread_mutex_unlock(&mtx);
        if (s != 0)
            perror("pthread_mutex_unlock");

        if (done)
            break;

        /* Perhaps do other work here that does not require mutex lock */

    }

    exit(EXIT_SUCCESS);
}
[root@izj6cfw9yi1iqoik31tqbgz c]# ./a.out 5 4
T=1: numConsumed=1
T=1: numConsumed=2
T=2: numConsumed=3
T=2: numConsumed=4
T=3: numConsumed=5
T=3: numConsumed=6
T=4: numConsumed=7
T=4: numConsumed=8
T=5: numConsumed=9


线程取消

所谓线程取消就是一个线程可以命令另一个线程终止。

#include <pthread.h>

int pthread_cancel(pthread_t thread);

//Returns 0 on success, or a positive error number on error

取消状态及类型

#include <pthread.h>

int pthread_setcancelstate(int state, int *oldstate);
int pthread_setcanceltype(int type, int *oldtype);

//Both return 0 on success, or a positive error number on error

state取值如下:

PTHREAD_CANCEL_DISABLE 线程不可取消,收到的取消请求将挂起,直到将线程取消状态置为启用。
PTHREAD_CANCEL_ENABLE 线程可以取消(默认)

如果线程设为可以取消,那么type值可以为如下:

PTHREAD_CANCEL_ASYNCHRONOUS 可能会在任何点取消线程。异步取消应用场景很少。
PTHREAD_CANCEL_DEFERRED 延迟取消(取消请求挂起),直至到达取消点。

取消点

若线程的取消性状态和类型分别置为启动和延迟,仅当线程抵达某个取消点时,取消请求才会生效。

除了这些,SUSv3 还指定了大量函数,其中包括stdio函数、dlopen API、syslog API、nftw()、popen()、semop()、unlink(),

例子

#include <sys/types.h>
#include <stdio.h>
#include <stdlib.h>
#include <unistd.h>
#include <errno.h>
#include <string.h>
#include <fcntl.h>
#include <pthread.h>

static void * threadFunc(void *arg){
    int j;

    printf("New thread started\n");     /* May be a cancellation point */
    for (j = 1; ; j++) {
        printf("Loop %d\n", j);         /* May be a cancellation point */
        sleep(1);                       /* A cancellation point */
    }

    /* NOTREACHED */
    return NULL;
}

int main(int argc, char *argv[]){
    pthread_t thr;
    int s;
    void *res;

    s = pthread_create(&thr, NULL, threadFunc, NULL);
    if (s != 0)
        perror("pthread_create");

    sleep(3);                           /* Allow new thread to run a while */

    s = pthread_cancel(thr);
    if (s != 0)
        perror("pthread_cancel");

    s = pthread_join(thr, &res);
    if (s != 0)
        perror("pthread_join");

    if (res == PTHREAD_CANCELED)
        printf("Thread was canceled\n");
    else
        printf("Thread was not canceled (should not happen!)\n");

    exit(EXIT_SUCCESS);
}
[root@izj6cfw9yi1iqoik31tqbgz c]# ./a.out 
New thread started
Loop 1
Loop 2
Loop 3
Thread was canceled


检测取消挂起并主动被取消

#include <pthread.h>

void pthread_testcancel(void);

当线程已有处于挂起状态的取消请求,那么只要调用此函数,线程立即终止。

清理函数

在线程被取消之前要执行一些额外的清理动作的函数

#include <pthread.h>

void pthread_cleanup_push(void (*routine)(void*), void *arg);
void pthread_cleanup_pop(int execute);

pthread_cleanup_pop() 会从清理函数最顶层函数栈中移除清理函数,如果execute大于0,则会执行清理动作。这两个可以为宏定义。

例子

#include <sys/types.h>
#include <stdio.h>
#include <stdlib.h>
#include <unistd.h>
#include <errno.h>
#include <string.h>
#include <fcntl.h>
#include <pthread.h>

static pthread_cond_t cond = PTHREAD_COND_INITIALIZER;
static pthread_mutex_t mtx = PTHREAD_MUTEX_INITIALIZER;
static int glob = 0;                    /* Predicate variable */

static void cleanupHandler(void *arg){
    int s;

    printf("cleanup: freeing block at %p\n", arg);
    free(arg);

    printf("cleanup: unlocking mutex\n");
    s = pthread_mutex_unlock(&mtx);
    if (s != 0)
        perror("pthread_mutex_unlock");
}

static void * threadFunc(void *arg){
    int s;
    void *buf = NULL;                   /* Buffer allocated by thread */

    buf = malloc(0x10000);              /* Not a cancellation point */
    printf("thread:  allocated memory at %p\n", buf);

    s = pthread_mutex_lock(&mtx);       /* Not a cancellation point */
    if (s != 0)
        perror("pthread_mutex_lock");

    pthread_cleanup_push(cleanupHandler, buf);

    while (glob == 0) {
        s = pthread_cond_wait(&cond, &mtx);     /* A cancellation point */
        if (s != 0)
            perror("pthread_cond_wait");
    }

    printf("thread:  condition wait loop completed\n");
    pthread_cleanup_pop(1);             /* Executes cleanup handler */
    return NULL;
}

int main(int argc, char *argv[]){
    pthread_t thr;
    void *res;
    int s;

    s = pthread_create(&thr, NULL, threadFunc, NULL);
    if (s != 0)
        perror("pthread_create");

    sleep(2);                   /* Give thread a chance to get started */

    if (argc == 1) {            /* Cancel thread */
        printf("main:    about to cancel thread\n");
        s = pthread_cancel(thr);
        if (s != 0)
            perror("pthread_cancel");

    } else {                    /* Signal condition variable */
        printf("main:    about to signal condition variable\n");
        glob = 1;
        s = pthread_cond_signal(&cond);
        if (s != 0)
            perror("pthread_cond_signal");
    }

    s = pthread_join(thr, &res);
    if (s != 0)
        perror("pthread_join");
    if (res == PTHREAD_CANCELED)
        printf("main:    thread was canceled\n");
    else
        printf("main:    thread terminated normally\n");

    exit(EXIT_SUCCESS);
}
[root@izj6cfw9yi1iqoik31tqbgz c]# ./a.out 
thread:  allocated memory at 0x7f48540008c0
main:    about to cancel thread
cleanup: freeing block at 0x7f48540008c0
cleanup: unlocking mutex
main:    thread was canceled


备注
1.编译器版本gcc4.8,运行环境centos7 64位
2.原文地址http://www.freecls.com/a/2712/55

©著作权归作者所有
收藏
推荐阅读
  • err
    linux c线程(1)-基础

    与进程类似,线程是允许应用程序并发执行多个任务的一种机制。一个进程可以包含多个线程。线程间共享同一份全局内存区域,其中包括初始化数据段,未初始化数据段,堆内存。传统uni...

  • err
    linux c程序的执行

    执行新程序:execve系统调用 execve() 可以将新程序加载到某一进程的内存空间。进程的栈、数据段、堆都会被新程序的相应部件所替换。由fork() 生成的子进程对...

  • err
    linux c监控子进程

    接下来描述两种监控子进程的技术:系统调用wait()及其变体,SIGCHLD信号。系统调用wait()#include &lt;sys/wait.h&gt; pid_t...

  • err
    linux c进程的创建、终止

    对进程不是很了解的同学可以参考linux 进程这篇文章。进程的创建系统调用fork() 创建一新进程(子进程)。#include &lt;unistd.h&gt; pi...

  • linux c定时器与休眠

    定时器是进程规划自己在未来某一时刻接获通知的一种机制。休眠则能是进程(或线程)暂停执行一段时间。间隔定时器系统调用 setitimer() 创建一个间隔式定时器,一定时间后到期,这个定时器也可以每个一...

  • nginx模块 ngx_http_headers_module

    ngx_http_headers_module 模块是用来增加 Expires 和 Cache-control,或者是任意的响应头。Syntax: add_header name value [alw...

  • nginx模块 ngx_http_gunzip_module、ngx_http_gzip_module、ngx_http_gzip_static_module

    ngx_http_gunzip_module 模块将文件解压缩后并在响应头加上 "Content-Encoding: gzip" 返回给客户端。为了解决客户端不支持gzip压缩。编译的时候带上 --w...

  • nginx模块 ngx_http_flv_module、ngx_http_mp4_module

    ngx_http_flv_module模块提供了对 flv 视频的伪流支持。编译的时候带上 --with-http_flv_module。它会根据指定的 start 参数来指定跳过多少字节,并在返回数...

  • nginx模块 ngx_http_fastcgi_module

    ngx_http_fastcgi_module 模块使得nginx可以与 fastcgi 服务器通信。比如目前要使得 nginx 支持 php 就得使用 fastcgi技术,在服务器上装上 nginx...

  • nginx模块 ngx_http_autoindex_module

    ngx_http_autoindex_module 模块可以将uri以 / 结尾时,列出里面的文件和目录。Syntax: autoindex on | off; Default: autoindex ...

简介
天降大任于斯人也,必先苦其心志。