ACL庫開發(fā)高并發(fā)半駐留式線程池程序

字號:

一、概述
    在當(dāng)今強(qiáng)調(diào)多核開發(fā)的年代,要求程序員能夠?qū)懗龈卟l(fā)的程序,而利用多個核一般有兩種方式:采用多線程方式或多進(jìn)程方式。每處理一個新任務(wù)時如果臨時產(chǎn)生一個線程或進(jìn)程且處理完任務(wù)后線程或進(jìn)程便立即退出,顯示這種方式是非常低效的,于是人們一般采用線程池的模型(這在JAVA或 .NET 中非常普遍)或多進(jìn)程進(jìn)程池模型(這一般在UNIX平臺應(yīng)用較多)。此外,對于線程池或進(jìn)程池模型又分為兩種情形:常駐留內(nèi)存或半駐留內(nèi)存,常駐內(nèi)存是指預(yù)先產(chǎn)生一批線程或進(jìn)程,等待新任務(wù)到達(dá),這些線程或進(jìn)程即使在空閑狀態(tài)也會常駐內(nèi)存;半駐留內(nèi)存是指當(dāng)來新任務(wù)時如果線程池或進(jìn)程池沒有可利用線程或進(jìn)程則啟動新的線程或進(jìn)程來處理新任務(wù),處理完后,線程或進(jìn)程并不立即退出,而是空閑指定時間,如果在空閑閥值時間到達(dá)前有新任務(wù)到達(dá)則立即處理新任務(wù),如果到達(dá)空閑超時后依然沒有新任務(wù)到達(dá),則這些空閑的線程或進(jìn)程便退出,以讓出系統(tǒng)資源。所以,對比常駐內(nèi)存方式和半駐留內(nèi)存方式,不難看出半駐留方式更有按需分配的意味。
    下面僅以ACL框架中的半駐留線程池模型為例介紹了如何寫一個半駐留線程池的程序。
    二、半駐留線程池例子
    2.1)程序代碼
    #include "lib_acl.h"
    #include
    /**
    * 用戶自定義數(shù)據(jù)結(jié)構(gòu)
    */
    typedef struct THREAD_CTX {
    int i;
    } THREAD_CTX;
    /* 全局性靜態(tài)變量 */
    static ACL_WORK_QUEUE *__wq = NULL;
    /* 線程局部存儲變量(C99支持此種方式聲明,方便許多) */
    static __thread unsigned int __local = 0;
    static void workq_thread_fn(int event acl_unused, void *arg)
    {
    ACL_WORKER_ATTR *worker_attr = (ACL_WORKER_ATTR*) arg; /* 獲得當(dāng)前工作線程的屬性變量 */
    THREAD_CTX *ctx = (THREAD_CTX*) worker_attr->run_data; /* 獲得用戶自定義對象 */
    int i = 5;
    while (i-- > 0) {
    printf("thread start! tid=%d, i=%d, __local=%d\r\n",
    acl_pthread_self(), ctx->i, __local);
    /* 在本線程中將線程局部變量加1 */
    __local++;
    sleep(1);
    }
    acl_myfree(ctx);
    /* 至此,該工作線程進(jìn)入空閑狀態(tài),直到空閑超時退出 */
    }
    static int __on_thread_init(void *arg, ACL_WORKER_ATTR *attr)
    {
    const char *myname = "__on_thread_init";
    ACL_WORK_QUEUE *wq = (ACL_WORK_QUEUE*) arg;
    /* 判斷一下,僅是為了驗證參數(shù)傳遞過程 */
    assert(wq == __wq);
    printf("%s: thread(%d) init now\r\n", myname, acl_pthread_self());
    /* 返回0表示繼續(xù)執(zhí)行該線程獲得的新任務(wù),返回-1表示停止執(zhí)行該任務(wù) */
    return (0);
    }
    static void __on_thread_exit(void *arg, ACL_WORKER_ATTR *attr)
    {
    const char *myname = "__on_thread_exit";
    ACL_WORK_QUEUE *wq = (ACL_WORK_QUEUE*) arg;
    /* 判斷一下,僅是為了驗證參數(shù)傳遞過程 */
    assert(wq == __wq);
    printf("%s: thread(%d) exit now\r\n", myname, acl_pthread_self());
    }
    static void run_thread_pool(ACL_WORK_QUEUE *wq)
    {
    THREAD_CTX *ctx; /* 用戶自定義參數(shù) */
    /* 設(shè)置全局靜態(tài)變量 */
    __wq = wq;
    /* 設(shè)置線程開始時的回調(diào)函數(shù) */
    (void) acl_workq_atinit(wq, __on_thread_init, wq);
    /* 設(shè)置線程退出時的回調(diào)函數(shù) */
    (void) acl_workq_atfree(wq, __on_thread_exit, wq);
    ctx = (THREAD_CTX*) acl_mycalloc(1, sizeof(THREAD_CTX));
    assert(ctx);
    ctx->i = 0;
    /**
    * 向線程池中添加第一個任務(wù),即啟動第一個工作線程
    * @param wq 線程池句柄
    * @param workq_thread_fn 工作線程的回調(diào)函數(shù)
    * @param event_type 此處寫0即可
    * @param ctx 用戶定義參數(shù)
    */
    acl_workq_add(wq, workq_thread_fn, 0, ctx);
    sleep(1);
    ctx = (THREAD_CTX*) acl_mycalloc(1, sizeof(THREAD_CTX));
    assert(ctx);
    ctx->i = 1;
    /* 向線程池中添加第二個任務(wù),即啟動第二個工作線程 */
    acl_workq_add(wq, workq_thread_fn, 0, ctx);
    }
    int main(int argc acl_unused, char *argv[] acl_unused)
    {
    ACL_WORK_QUEUE *wq;
    int max_threads = 20; /* 最多并發(fā)20個線程 */
    int idle_timeout = 10; /* 每個工作線程當(dāng)空閑10秒后自動退出 */
    /* 創(chuàng)建半駐留線程句柄 */
    wq = acl_workq_create(max_threads, idle_timeout, NULL, NULL);
    assert(wq);
    run_thread_pool(wq);
    /* 主線程等待用戶在終端輸入任意字符后退出 */
    getchar();
    /* 銷毀線程池對象 */
    acl_workq_destroy(wq);
    return (0);
    }
    2.2)編譯鏈接
    從 http://www.sourceforge.net/projects/acl/ 站點下載 acl_project 代碼,在WIN32平臺下請用VC2003編譯,打開 acl_project\win32_build\vc\acl_project_vc2003.sln 編譯后在目錄 acl_project\dist\lib_acl\lib\win32 下生成lib_acl_vc2003.lib, 然后在示例的控制臺工程中包含該庫,并包含acl_project\lib_acl\include 下的 lib_acl.h 頭文件,編譯上述源代碼即可。
    另外,還可以查看ACL的在線幫助文檔:http://acl.sourceforge.net/acl_help/index.html
    2.3) 運(yùn)行
    運(yùn)行示例程序后,在我的機(jī)器的顯示結(jié)果如下:
    __on_thread_init: thread(14016) init now
    thread start! tid=14016, i=0, __local=0
    thread start! tid=14016, i=0, __local=1
    __on_thread_init: thread(8792) init now
    thread start! tid=8792, i=1, __local=0
    thread start! tid=14016, i=0, __local=2
    thread start! tid=8792, i=1, __local=1
    thread start! tid=8792, i=1, __local=2
    thread start! tid=14016, i=0, __local=3
    thread start! tid=14016, i=0, __local=4
    thread start! tid=8792, i=1, __local=3
    thread start! tid=8792, i=1, __local=4
    __on_thread_exit: thread(14016) exit now
    __on_thread_exit: thread(8792) exit now