引言:Nginx模块提供了upstream(上游服务器)的异步回调处理模块开发,以memcached 模块为例
commands结构的初始化为相应的命令添加回调函数,用来处理 memcached_pass 这个命令
1 static ngx_command_t ngx_http_memcached_commands[] = { 2 3 { ngx_string("memcached_pass"), 4 NGX_HTTP_LOC_CONF|NGX_HTTP_LIF_CONF|NGX_CONF_TAKE1, 5 ngx_http_memcached_pass, // 这里添加回调 6 NGX_HTTP_LOC_CONF_OFFSET, 7 0, 8 NULL }, 9 ..... 10 }
看看 ngx_http_memcached_pass 的实现:
此函数会对 conf 做一些初始化操作
static char *
ngx_http_memcached_pass(ngx_conf_t *cf, ngx_command_t *cmd, void *conf)
{
ngx_http_memcached_loc_conf_t *mlcf = conf;
ngx_str_t *value;
ngx_url_t u;
ngx_http_core_loc_conf_t *clcf;
if (mlcf->upstream.upstream) {
return "is duplicate";
}
value = cf->args->elts;
ngx_memzero(&u, sizeof(ngx_url_t));
u.url = value[1];
u.no_resolve = 1;
mlcf->upstream.upstream = ngx_http_upstream_add(cf, &u, 0);
if (mlcf->upstream.upstream == NULL) {
return NGX_CONF_ERROR;
}
clcf = ngx_http_conf_get_module_loc_conf(cf, ngx_http_core_module);
clcf->handler = ngx_http_memcached_handler; //为此请求添加回调
if (clcf->name.data[clcf->name.len - 1] == '/') {
clcf->auto_redirect = 1;
}
mlcf->index = ngx_http_get_variable_index(cf, &ngx_http_memcached_key);
if (mlcf->index == NGX_ERROR) {
return NGX_CONF_ERROR;
}
return NGX_CONF_OK;
}
整个模块的定义:
ngx_http_memcached_commands 是传入的变量
ngx_http_memcached_module_ctx 是模块的 context 结构
此结构存储了所有的为配置信息初始化的回调函数
1 ngx_module_t ngx_http_memcached_module = { 2 NGX_MODULE_V1, 3 &ngx_http_memcached_module_ctx, /* module context */ 4 ngx_http_memcached_commands, /* module directives */ 5 NGX_HTTP_MODULE, /* module type */ 6 NULL, /* init master */ 7 NULL, /* init module */ 8 NULL, /* init process */ 9 NULL, /* init thread */ 10 NULL, /* exit thread */ 11 NULL, /* exit process */ 12 NULL, /* exit master */ 13 NGX_MODULE_V1_PADDING 14 };
1 static ngx_http_module_t ngx_http_memcached_module_ctx = { 2 NULL, /* preconfiguration */ 3 NULL, /* postconfiguration */ 4 5 NULL, /* create main configuration */ 6 NULL, /* init main configuration */ 7 8 NULL, /* create server configuration */ 9 NULL, /* merge server configuration */ 10 11 ngx_http_memcached_create_loc_conf, /* create location configration */ 12 ngx_http_memcached_merge_loc_conf /* merge location configration */ 13 };
回调 ngx_http_memcached_create_loc_conf 用来为配置信息分配存储空间并初始化
以后需要使用到配置信息的参数的时候使用方式如下:
ngx_http_memcached_loc_conf_t *mlcf; mlcf = ngx_http_get_module_loc_conf(r, ngx_http_memcached_module);
其中 ngx_http_memcached_loc_conf_t 是自己定义的配置信息的结构
memcached 模块的定义如下:
1 typedef struct { 2 ngx_http_upstream_conf_t upstream; 3 ngx_int_t index; 4 } ngx_http_memcached_loc_conf_t;
这些都是初始化的操作,看看核心函数:
ngx_http_memcached_handler
1 static ngx_int_t 2 ngx_http_memcached_handler(ngx_http_request_t *r) 3 { 4 ngx_int_t rc; 5 ngx_http_upstream_t *u; 6 ngx_http_memcached_ctx_t *ctx; 7 ngx_http_memcached_loc_conf_t *mlcf; 8 9 if (!(r->method & (NGX_HTTP_GET|NGX_HTTP_HEAD))) { 10 return NGX_HTTP_NOT_ALLOWED; 11 } 12 13 rc = ngx_http_discard_request_body(r); 14 15 if (rc != NGX_OK) { 16 return rc; 17 } 18 19 if (ngx_http_set_content_type(r) != NGX_OK) { 20 return NGX_HTTP_INTERNAL_SERVER_ERROR; 21 } 22 // 创建一个 upstream 23 if (ngx_http_upstream_create(r) != NGX_OK) { 24 return NGX_HTTP_INTERNAL_SERVER_ERROR; 25 } 26 //取得这个 upstream 27 u = r->upstream; 28 29 ngx_str_set(&u->schema, "memcached://"); 30 u->output.tag = (ngx_buf_tag_t) &ngx_http_memcached_module; 31 32 ngx_http_memcached_loc_conf_t *mlcf; 33 mlcf = ngx_http_get_module_loc_conf(r, ngx_http_memcached_module); 34 35 u->conf = &mlcf->upstream; 36 //为 upstream 添加回调 37 u->create_request = ngx_http_memcached_create_request; //创建到 upstream的后端请求 38 u->reinit_request = ngx_http_memcached_reinit_request; //失败后再次创建 39 u->process_header = ngx_http_memcached_process_header; //处理来自后端的数据 40 u->abort_request = ngx_http_memcached_abort_request; //放弃数据 41 u->finalize_request = ngx_http_memcached_finalize_request; //析构这个请求 42 43 ctx = ngx_palloc(r->pool, sizeof(ngx_http_memcached_ctx_t)); 44 if (ctx == NULL) { 45 return NGX_HTTP_INTERNAL_SERVER_ERROR; 46 } 47 48 ctx->rest = NGX_HTTP_MEMCACHED_END; 49 ctx->request = r; 50 51 ngx_http_set_ctx(r, ctx, ngx_http_memcached_module); 52 53 u->input_filter_init = ngx_http_memcached_filter_init; 54 u->input_filter = ngx_http_memcached_filter; 55 u->input_filter_ctx = ctx; 56 57 r->main->count++; 58 59 ngx_http_upstream_init(r); 60 61 return NGX_DONE; 62 }
其中可以看到, 大部分的回调都是传入参数 ngx_http_request_t *r
1 static ngx_int_t ngx_http_memcached_create_request(ngx_http_request_t *r); 2 static ngx_int_t ngx_http_memcached_reinit_request(ngx_http_request_t *r); 3 static ngx_int_t ngx_http_memcached_process_header(ngx_http_request_t *r); 4 static ngx_int_t ngx_http_memcached_filter_init(void *data); 5 static ngx_int_t ngx_http_memcached_filter(void *data, ssize_t bytes); 6 static void ngx_http_memcached_abort_request(ngx_http_request_t *r); 7 static void ngx_http_memcached_finalize_request(ngx_http_request_t *r, 8 ngx_int_t rc);
函数处理时候会通过 r 取得 upstream并做一番”加工“
create_request 的时候会有 r -> upstream = u ;
设置完这几个回调函数以后,就会为相应的请求添加这个回调,来自下游的服务器的请求会被分发到upstream中去,但是Nginx在本身的转发过程中是不阻塞的