亚洲激情专区-91九色丨porny丨老师-久久久久久久女国产乱让韩-国产精品午夜小视频观看

溫馨提示×

溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊×
其他方式登錄
點擊 登錄注冊 即表示同意《億速云用戶服務條款》

nginx源碼分析線程池詳解

發布時間:2020-10-01 17:07:16 來源:腳本之家 閱讀:147 作者:lqh 欄目:服務器

nginx源碼分析線程池詳解

一、前言

     nginx是采用多進程模型,master和worker之間主要通過pipe管道的方式進行通信,多進程的優勢就在于各個進程互不影響。但是經常會有人問道,nginx為什么不采用多線程模型(這個除了之前一篇文章講到的情況,別的只有去問作者了,HAHA)。其實,nginx代碼中提供了一個thread_pool(線程池)的核心模塊來處理多任務的。下面就本人對該thread_pool這個模塊的理解來跟大家做些分享(文中錯誤、不足還請大家指出,謝謝) 

二、thread_pool線程池模塊介紹

     nginx的主要功能都是由一個個模塊構成的,thread_pool也不例外。線程池主要用于讀取、發送文件等IO操作,避免慢速IO影響worker的正常運行。先引用一段官方的配置示例

Syntax: thread_pool name threads=number [max_queue=number];
Default: thread_pool default threads=32 max_queue=65536;
Context: main

     根據上述的配置說明,thread_pool是有名字的,上面的線程數目以及隊列大小都是指每個worker進程中的線程,而不是所有worker中線程的總數。一個線程池中所有的線程共享一個隊列,隊列中的最大人數數量為上面定義的max_queue,如果隊列滿了的話,再往隊列中添加任務就會報錯。 

     根據之前講到過的模塊初始化流程(在master啟動worker之前) create_conf--> command_set函數-->init_conf,下面就按照這個流程看看thread_pool模塊的初始化

/******************* nginx/src/core/ngx_thread_pool.c ************************/
//創建線程池所需的基礎結構
static void * ngx_thread_pool_create_conf(ngx_cycle_t *cycle)
{
  ngx_thread_pool_conf_t *tcf;
   //從cycle->pool指向的內存池中申請一塊內存
  tcf = ngx_pcalloc(cycle->pool, sizeof(ngx_thread_pool_conf_t));
  if (tcf == NULL) {
    return NULL;
  }
   
   //先申請包含4個ngx_thread_pool_t指針類型元素的數組
   //ngx_thread_pool_t結構體中保存了一個線程池相關的信息
  if (ngx_array_init(&tcf->pools, cycle->pool, 4,
            sizeof(ngx_thread_pool_t *))
    != NGX_OK)
  {
    return NULL;
  }
 
  return tcf;
}
 
//解析處理配置文件中thread_pool的配置,并將相關信息保存的ngx_thread_pool_t中
static char * ngx_thread_pool(ngx_conf_t *cf, ngx_command_t *cmd, void *conf)
{
  ngx_str_t     *value;
  ngx_uint_t     i;
  ngx_thread_pool_t *tp;
 
  value = cf->args->elts;
 
  //根據thread_pool配置中的name作為線程池的唯一標識(如果重名,只有第一個有效)
  //申請ngx_thread_pool_t結構保存線程池的相關信息
  //由此可見,nginx支持配置多個name不同的線程池
  tp = ngx_thread_pool_add(cf, &value[1]);
  .......
  //處理thread_pool配置行的所有元素
  for (i = 2; i < cf->args->nelts; i++) {
    //檢查配置的線程數
    if (ngx_strncmp(value[i].data, "threads=", 8) == 0) {
     .......
    }
     
    //檢查配置的最大隊列長度
    if (ngx_strncmp(value[i].data, "max_queue=", 10) == 0) {
     .......
    }
  }
  ......
}
 
//判斷包含多個線程池的數組中的各個線程池的配置是否正確
static char * ngx_thread_pool_init_conf(ngx_cycle_t *cycle, void *conf)
{
  ....
  ngx_thread_pool_t **tpp;
 
  tpp = tcf->pools.elts;
  //遍歷數組中所有的線程池配置,并檢查其正確性
  for (i = 0; i < tcf->pools.nelts; i++) {
    .....
  }
 
  return NGX_CONF_OK;
}

     在上述的流程走完之后,nginx的master就保存了一份所有線程池的配置(tcf->pools),這份配置在創建worker時也會被繼承。然后每個worker中都調用各個核心模塊的init_process函數(如果有的話)。

/******************* nginx/src/core/ngx_thread_pool.c ************************/
//創建線程池所需的基礎結構
static ngx_int_t
ngx_thread_pool_init_worker(ngx_cycle_t *cycle)
{
  ngx_uint_t        i;
  ngx_thread_pool_t    **tpp;
  ngx_thread_pool_conf_t  *tcf;
  //如果不是worker或者只有一個worker就不起用線程池
  if (ngx_process != NGX_PROCESS_WORKER
    && ngx_process != NGX_PROCESS_SINGLE)
  {
    return NGX_OK;
  }
   
  //初始化任務隊列
  ngx_thread_pool_queue_init(&ngx_thread_pool_done);
 
  tpp = tcf->pools.elts;
  for (i = 0; i < tcf->pools.nelts; i++) {
    //初始化各個線程池
    if (ngx_thread_pool_init(tpp[i], cycle->log, cycle->pool) != NGX_OK) {
      return NGX_ERROR;
    }
  }
 
  return NGX_OK;
}
 
//線程池初始化
static ngx_int_t ngx_thread_pool_init(ngx_thread_pool_t *tp, ngx_log_t *log, ngx_pool_t *pool)
{
  .....
  //初始化任務隊列
  ngx_thread_pool_queue_init(&tp->queue);
 
  //創建線程鎖
  if (ngx_thread_mutex_create(&tp->mtx, log) != NGX_OK) {
    return NGX_ERROR;
  }
 
  //創建線程條件變量
  if (ngx_thread_cond_create(&tp->cond, log) != NGX_OK) {
    (void) ngx_thread_mutex_destroy(&tp->mtx, log);
    return NGX_ERROR;
  }
  ......
  for (n = 0; n < tp->threads; n++) {
    //創建線程池中的每個線程
    err = pthread_create(&tid, &attr, ngx_thread_pool_cycle, tp);
    if (err) {
      ngx_log_error(NGX_LOG_ALERT, log, err,
             "pthread_create() failed");
      return NGX_ERROR;
    }
  }
  ......
}
 
//線程池中線程處理主函數
static void *ngx_thread_pool_cycle(void *data)
{
   ......
   for ( ;; ) {
    //阻塞的方式獲取線程鎖
    if (ngx_thread_mutex_lock(&tp->mtx, tp->log) != NGX_OK) {
      return NULL;
    }
 
    /* the number may become negative */
    tp->waiting--;
 
    //如果任務隊列為空,就cond_wait阻塞等待有新任務時調用cond_signal/broadcast觸發
    while (tp->queue.first == NULL) {
      if (ngx_thread_cond_wait(&tp->cond, &tp->mtx, tp->log)
        != NGX_OK)
      {
        (void) ngx_thread_mutex_unlock(&tp->mtx, tp->log);
        return NULL;
      }
    }
    //從任務隊列中獲取task,并將其從隊列中移除
    task = tp->queue.first;
    tp->queue.first = task->next;
 
    if (tp->queue.first == NULL) {
      tp->queue.last = &tp->queue.first;
    }
 
    if (ngx_thread_mutex_unlock(&tp->mtx, tp->log) != NGX_OK) {
      return NULL;
    }
    ......
    //task的處理函數
    task->handler(task->ctx, tp->log);
    .....
 
    ngx_spinlock(&ngx_thread_pool_done_lock, 1, 2048);
 
    //將經過預處理的任務添加到done隊列中等待調用event的回調函數繼續處理
    *ngx_thread_pool_done.last = task;
    ngx_thread_pool_done.last = &task->next;
     
    //防止編譯器優化,保證解鎖操作是在上述語句執行完畢后再去執行的
    ngx_memory_barrier();
 
    ngx_unlock(&ngx_thread_pool_done_lock);
     
    (void) ngx_notify(ngx_thread_pool_handler);
  }
}
 
//處理pool_done隊列上task中包含的每個event事件
static void ngx_thread_pool_handler(ngx_event_t *ev)
{
  .....
  ngx_spinlock(&ngx_thread_pool_done_lock, 1, 2048);
 
  //獲取任務鏈表的頭部
  task = ngx_thread_pool_done.first;
  ngx_thread_pool_done.first = NULL;
  ngx_thread_pool_done.last = &ngx_thread_pool_done.first;
 
  ngx_memory_barrier();
 
  ngx_unlock(&ngx_thread_pool_done_lock);
 
  while (task) {
    ngx_log_debug1(NGX_LOG_DEBUG_CORE, ev->log, 0,
            "run completion handler for task #%ui", task->id);
    //遍歷隊列中的所有任務事件
    event = &task->event;
    task = task->next;
 
    event->complete = 1;
    event->active = 0;
 
    //調用event對應的處理函數有針對性的進行處理
    event->handler(event);
  }
}
 

三、thread_pool線程池使用示例

     根據之前所講到的,nginx中的線程池主要是用于操作文件的IO操作。所以,在nginx中自帶的模塊ngx_http_file_cache.c文件中看到了線程池的使用。

/*********************** nginx/src/os/unix/ngx_files.c **********************/
//file_cache模塊的處理函數(涉及到了線程池)
static ssize_t ngx_http_file_cache_aio_read(ngx_http_request_t *r, ngx_http_cache_t *c)
{
  .......
#if (NGX_THREADS)
 
  if (clcf->aio == NGX_HTTP_AIO_THREADS) {
    c->file.thread_task = c->thread_task;
    //這里注冊的函數在下面語句中的ngx_thread_read函數中被調用
    c->file.thread_handler = ngx_http_cache_thread_handler;
    c->file.thread_ctx = r;
    //根據任務的屬性,選擇正確的線程池,并初始化task結構體中的各個成員    
    n = ngx_thread_read(&c->file, c->buf->pos, c->body_start, 0, r->pool);
 
    c->thread_task = c->file.thread_task;
    c->reading = (n == NGX_AGAIN);
 
    return n;
  }
#endif
 
  return ngx_read_file(&c->file, c->buf->pos, c->body_start, 0);
}
 
 
//task任務的處理函數
static ngx_int_t ngx_http_cache_thread_handler(ngx_thread_task_t *task, ngx_file_t *file)
{
  .......
  tp = clcf->thread_pool;
  .......
   
  task->event.data = r;
  //注冊thread_event_handler函數,該函數在處理pool_done隊列中event事件時被調用
  task->event.handler = ngx_http_cache_thread_event_handler;
 
  //將任務放到線程池的任務隊列中
  if (ngx_thread_task_post(tp, task) != NGX_OK) {
    return NGX_ERROR;
  }
  ......
}
 
/*********************** nginx/src/core/ngx_thread_pool.c **********************/
//添加任務到隊列中
ngx_int_t ngx_thread_task_post(ngx_thread_pool_t *tp, ngx_thread_task_t *task)
{
  //如果當前的任務正在處理就退出
  if (task->event.active) {
    ngx_log_error(NGX_LOG_ALERT, tp->log, 0,
           "task #%ui already active", task->id);
    return NGX_ERROR;
  }
 
  if (ngx_thread_mutex_lock(&tp->mtx, tp->log) != NGX_OK) {
    return NGX_ERROR;
  }
   
  //判斷當前線程池等待的任務數量與最大隊列長度的關系
  if (tp->waiting >= tp->max_queue) {
    (void) ngx_thread_mutex_unlock(&tp->mtx, tp->log);
 
    ngx_log_error(NGX_LOG_ERR, tp->log, 0,
           "thread pool \"%V\" queue overflow: %i tasks waiting",
           &tp->name, tp->waiting);
    return NGX_ERROR;
  }
  //激活任務
  task->event.active = 1;
 
  task->id = ngx_thread_pool_task_id++;
  task->next = NULL;
   
  //通知阻塞的線程有新事件加入,可以解除阻塞
  if (ngx_thread_cond_signal(&tp->cond, tp->log) != NGX_OK) {
    (void) ngx_thread_mutex_unlock(&tp->mtx, tp->log);
    return NGX_ERROR;
  }
 
  *tp->queue.last = task;
  tp->queue.last = &task->next;
 
  tp->waiting++;
 
  (void) ngx_thread_mutex_unlock(&tp->mtx, tp->log);
 
  ngx_log_debug2(NGX_LOG_DEBUG_CORE, tp->log, 0,
          "task #%ui added to thread pool \"%V\"",
          task->id, &tp->name);
 
  return NGX_OK;
}

    上面示例基本展示了nginx目前對線程池的使用方法,采用線程池來處理IO這類慢速操作可以提升worker的主線程的執行效率。當然,用戶自己在開發模塊時,也可以參照file_cache模塊中使用線程池的方法來調用多線程提升程序性能。(歡迎大家多多批評指正)

感謝閱讀,希望能幫助到大家,謝謝大家對本站的支持!

向AI問一下細節

免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。

AI

南召县| 千阳县| 民权县| 广元市| 武穴市| 灵武市| 延安市| 普宁市| 丹江口市| 溆浦县| 宝山区| 萝北县| 台中县| 紫金县| 高邑县| 阳新县| 库尔勒市| 黎川县| 甘洛县| 页游| 义乌市| 富宁县| 定结县| 淳化县| 榕江县| 资中县| 栾川县| 顺昌县| 盐城市| 邯郸市| 涞源县| 措勤县| 固始县| 张家口市| 临夏县| 陆丰市| 黄山市| 北票市| 息烽县| 无锡市| 翁源县|