公司内部协议均是固定包长的二进制协议,对于内部服务器通信来说足够了,但接口服务器还是采用了http协议,毕竟通用,况且私有二进制协议对外非常不好友,更何况还易遭防火墙拦截;写一个通用且配置功能强大的http server是比较困难的。项目组写的http框架非常难用,仅仅达到能用而已,效率低下,不灵活等等;
在接触了nginx后,被其能扩展的特性深深吸引了,于是尝试为项目组的框架写一个能一个扩展模块,需求蛮明确的:就是将http协议转成服务器内部的二进制协议;
在网上找资料,资料比较稀少,大多是一个简单的hello world例子,比较少参考性;《Emiller的Nginx模块开发心得.pdf》相对而言是一个完善的文档;但看了之后还是感觉一头雾水,不甚明了;最好的文档就是代码,下载了 nginx-1.0.8 源码;source insight 建项目,看代码,析流程;渐渐nginx流程在脑海中明晰起来;
看代码熟悉nginx花3天时间;着手写代码到代码完成1天半,测试休bug到完成目标需求花费1天,为了写这个扩展,把整个周末都搭进去了,晚上还熬了下夜,最后看着内部服务器的数据通过扩展模块中介到nginx输出,还是有点小成就感的;
终极目标:做个车联网网关

[图片上传中...(11(50).png-3a0a70-1562205821153-0)]
xdrive.rar ](http://blog.chinaunix.net/attachment/attach/26/44/39/21264439210d08ecda94a90e0ccf70f678244cb508.rar)
注:因代码中夹杂了些公司项目的业务,这些代码在protocal文件夹下,被我从压缩包中剔除了,但绝对不影响代码整个流程完整性;
nginx 只支持c代码,扩展模块中加入了不少c++代码,也懒得去搞其他方法了,直接修改了 auto/make 文件,改动如下:
1. CPP = g++
2. LINK = \$(CPP) ##采用g++来链接
1. ##line=338 below was changed by kevin_zhong on 2011-11-14
3. ngx_obj=`echo $ngx_obj \
4. | sed -e "s#^\(.*\.\)cpp\\$#$ngx_objs_dir\1$ngx_objext#g" \
5. -e "s#^\(.*\.\)cc\\$#$ngx_objs_dir\1$ngx_objext#g" \
6. -e "s#^\(.*\.\)c\\$#$ngx_objs_dir\1$ngx_objext#g" \
7. -e "s#^\(.*\.\)S\\$#$ngx_objs_dir\1$ngx_objext#g"`
9. ngx_post_suffix=`echo $ngx_src \
10. | sed -e "s#^.*\(\.c\)\\$#\1#g" \
11. -e "s#^.*\(\.cc\)\\$#\1#g" \
12. -e "s#^.*\(\.cpp\)\\$#\1#g"`
14. if [ "$ngx_post_suffix"x = ".cpp"x ];then
15. ngx_cc="\$(CPP) $ngx_compile_opt \$(CFLAGS) $ngx_use_pch \$(ALL_INCS) $ADDON_INCS"
16. else
17. ngx_cc="\$(CC) $ngx_compile_opt \$(CFLAGS) $ngx_use_pch \$(ALL_INCS) $ADDON_INCS"
18. fi
上面的脚本是判断源代码后缀,如果是c++则生成makefile语句采用g++,否则采用gcc;
下面是具体代码分析:
1. /*
2. * Copyright (C) Igor Sysoev; kevin_zhong
3. * mail: qq2000zhong@gmail.com
4. * date: 2011-11-13
5. */
7. //因是cpp文件,固包含c头文件需要 extern c
8. extern "C" {
9. #include <ngx_config.h>
10. #include <ngx_core.h>
11. #include <ngx_http.h>
12. #include "ngx_chain_util.h"
13. }
15. //与服务器内部通信二进制协议实现
16. #include "ngx_thrift_transport.h"
17. #include "ngx_xdrive_datagram.h"
18. #include "protocal/rc_updator_types.h"
20. using namespace xdrive::msg::rc_updator;
21. using namespace xdrive;
23. /*
24. * 扩展模块需要3个业务相关输入变量,uid,path,recusive
25. * 参考nginx.conf中的配置写法
26. */
28. typedef struct
29. {
30. ngx_http_upstream_conf_t upstream;
32. //将uid和path以及recusive在配置中的index找出来,以后create request的时候需要
33. ngx_int_t uid_index;
34. ngx_int_t path_index;
35. ngx_int_t recusive_index;
36. }
37. ngx_http_xdrive_rc_loc_conf_t;
39. /*
40. * 注明下,这个模块和网上诸多模块以及nginx特有模块差别最大的地方是:
41. *
42. * 1, 因为项目组的二进制协议不是流式协议,即必须将数据包全部收完整后,
43. * 才能调用decode解码,所以不能像其他模块那样采用流,即不能一边接
44. * 受数据,一边发送数据;只能先将数据全部缓存起来,等到收集到完整的resp包,
45. * 再一次性解码,然后再转换成 json 类格式一次性输出,这是这类协议最大最明显的缺点;
46. *
47. * 2,虽然从后端server收到的resp content length是确定的,但经过转换(从二进制到json类)
48. * 后,content len 已经变得不相等,且很不好计算;所以只能采用 chunk 方式返回给client
49. *
50. * 3,网上有的,或者<Emiller的Nginx模块开发心得.pdf>中有的,都不提,参考即可;
51. */
53. typedef struct
54. {
55. ngx_http_request_t *request;
56. ngx_chain_pair_t body_buff;
57. ngx_chain_t * tail_buff;
58. uint64_t uid;
59. ngx_str_t path;
60. bool recusive;
62. //后端剩余接受包体长度,即还有多少个字节等待从后端读取,本来不需要这个length的
63. //开始代码是存储 r.out_headers.content_len_n,u->length = r.out_headers.content_len_n
64. //upstream通过u->length==0判断后端数据是否接受完毕,但这样client回复包将得到一个不正确
65. //的 content len,导致接受http包体数据异常...
66. //参考 ngx_http_upstream.c:2391
67. int rest_length;
68. }
69. ngx_http_xdrive_rc_ctx_t;
72. static ngx_int_t ngx_http_xdrive_rc_add_variables(ngx_conf_t *cf);
73. static ngx_int_t ngx_http_xdrive_rc_create_request(ngx_http_request_t *r);
74. static ngx_int_t ngx_http_xdrive_rc_reinit_request(ngx_http_request_t *r);
75. static ngx_int_t ngx_http_xdrive_rc_process_header(ngx_http_request_t *r);
76. static ngx_int_t ngx_http_xdrive_rc_filter_init(void *data);
77. static ngx_int_t ngx_http_xdrive_rc_filter(void *data, ssize_t bytes);
78. static void ngx_http_xdrive_rc_abort_request(ngx_http_request_t *r);
79. static void ngx_http_xdrive_rc_finalize_request(ngx_http_request_t *r, ngx_int_t rc);
81. static void *ngx_http_xdrive_rc_create_loc_conf(ngx_conf_t *cf);
82. static char *ngx_http_xdrive_rc_merge_loc_conf(ngx_conf_t *cf, void *parent, void *child);
84. static char *ngx_http_xdrive_rc_pass(ngx_conf_t *cf, ngx_command_t *cmd, void *conf);
87. static ngx_conf_bitmask_t ngx_http_xdrive_rc_next_upstream_masks[] = {
88. { ngx_string("error"), NGX_HTTP_UPSTREAM_FT_ERROR },
89. { ngx_string("timeout"), NGX_HTTP_UPSTREAM_FT_TIMEOUT },
90. { ngx_string("invalid_header"), NGX_HTTP_UPSTREAM_FT_INVALID_HEADER },
91. { ngx_string("not_found"), NGX_HTTP_UPSTREAM_FT_HTTP_404 },
92. { ngx_string("off"), NGX_HTTP_UPSTREAM_FT_OFF },
93. { ngx_null_string, 0 }
94. };
96. /*
97. * 参数设置,不可变,注意和变量的区别
98. */
99. static ngx_command_t ngx_http_xdrive_rc_commands[] = {
100. { ngx_string("xdrive_rc_pass"),
101. NGX_HTTP_LOC_CONF | NGX_HTTP_LIF_CONF | NGX_CONF_TAKE1,
102. ngx_http_xdrive_rc_pass,
103. NGX_HTTP_LOC_CONF_OFFSET,
104. 0,
105. NULL },
107. { ngx_string("xdrive_rc_connect_timeout"),
108. NGX_HTTP_MAIN_CONF | NGX_HTTP_SRV_CONF | NGX_HTTP_LOC_CONF | NGX_CONF_TAKE1,
109. ngx_conf_set_msec_slot,
110. NGX_HTTP_LOC_CONF_OFFSET,
111. offsetof(ngx_http_xdrive_rc_loc_conf_t, upstream.connect_timeout),
112. NULL },
114. { ngx_string("xdrive_rc_send_timeout"),
115. NGX_HTTP_MAIN_CONF | NGX_HTTP_SRV_CONF | NGX_HTTP_LOC_CONF | NGX_CONF_TAKE1,
116. ngx_conf_set_msec_slot,
117. NGX_HTTP_LOC_CONF_OFFSET,
118. offsetof(ngx_http_xdrive_rc_loc_conf_t, upstream.send_timeout),
119. NULL },
121. { ngx_string("xdrive_rc_buffer_size"),
122. NGX_HTTP_MAIN_CONF | NGX_HTTP_SRV_CONF | NGX_HTTP_LOC_CONF | NGX_CONF_TAKE1,
123. ngx_conf_set_size_slot,
124. NGX_HTTP_LOC_CONF_OFFSET,
125. offsetof(ngx_http_xdrive_rc_loc_conf_t, upstream.buffer_size),
126. NULL },
128. { ngx_string("xdrive_rc_read_timeout"),
129. NGX_HTTP_MAIN_CONF | NGX_HTTP_SRV_CONF | NGX_HTTP_LOC_CONF | NGX_CONF_TAKE1,
130. ngx_conf_set_msec_slot,
131. NGX_HTTP_LOC_CONF_OFFSET,
132. offsetof(ngx_http_xdrive_rc_loc_conf_t, upstream.read_timeout),
133. NULL },
135. { ngx_string("xdrive_rc_next_upstream"),
136. NGX_HTTP_MAIN_CONF | NGX_HTTP_SRV_CONF | NGX_HTTP_LOC_CONF | NGX_CONF_1MORE,
137. ngx_conf_set_bitmask_slot,
138. NGX_HTTP_LOC_CONF_OFFSET,
139. offsetof(ngx_http_xdrive_rc_loc_conf_t, upstream.next_upstream),
140. &ngx_http_xdrive_rc_next_upstream_masks },
142. ngx_null_command
143. };
146. static ngx_http_module_t ngx_http_xdrive_rc_module_ctx = {
147. ngx_http_xdrive_rc_add_variables, /* preconfiguration */
148. NULL, /* postconfiguration */
150. NULL, /* create main configuration */
151. NULL, /* init main configuration */
153. NULL, /* create server configuration */
154. NULL, /* merge server configuration */
156. ngx_http_xdrive_rc_create_loc_conf, /* create location configration */
157. ngx_http_xdrive_rc_merge_loc_conf /* merge location configration */
158. };
161. ngx_module_t ngx_http_xdrive_rc_module = {
162. NGX_MODULE_V1,
163. &ngx_http_xdrive_rc_module_ctx, /* module context */
164. ngx_http_xdrive_rc_commands, /* module directives */
165. NGX_HTTP_MODULE, /* module type */
166. NULL, /* init master */
167. NULL, /* init module */
168. NULL, /* init process */
169. NULL, /* init thread */
170. NULL, /* exit thread */
171. NULL, /* exit process */
172. NULL, /* exit master */
173. NGX_MODULE_V1_PADDING
174. };
176. //业务相关变量,get_handler = NULL,因为这三个是从conf里面通过
177. //正则匹配得到的,为什么不直接通过 get handler 从http requeset里面获取了
178. //因为这样更灵活,conf可以随时改,比如现在 uid 是从 url 里面获取,但如果
179. //业务需要uid放在 query_string,这时候就只需要改配置即可了
180. //思路来源于 ngx_http_memcached_module.c
182. static ngx_http_variable_t ngx_http_proxy_vars[] = {
183. { ngx_string("uid"), NULL,
184. NULL, 0,
185. NGX_HTTP_VAR_CHANGEABLE | NGX_HTTP_VAR_NOCACHEABLE | NGX_HTTP_VAR_NOHASH,
186. 0 },
187. { ngx_string("path"), NULL,
188. NULL, 0,
189. NGX_HTTP_VAR_CHANGEABLE | NGX_HTTP_VAR_NOCACHEABLE | NGX_HTTP_VAR_NOHASH,
190. 0 },
191. { ngx_string("recusive"), NULL,
192. NULL, 0,
193. NGX_HTTP_VAR_CHANGEABLE | NGX_HTTP_VAR_NOCACHEABLE | NGX_HTTP_VAR_NOHASH,
194. 0 },
195. { ngx_null_string, NULL,NULL,0, 0, 0 }
196. };
199. static ngx_int_t
200. ngx_http_xdrive_rc_handler(ngx_http_request_t *r)
201. {
202. ngx_int_t rc;
203. ngx_http_upstream_t *u;
204. ngx_http_xdrive_rc_ctx_t *ctx;
205. ngx_http_xdrive_rc_loc_conf_t *mlcf;
207. if (!(r->method & (NGX_HTTP_GET | NGX_HTTP_HEAD)))
208. {
209. return NGX_HTTP_NOT_ALLOWED;
210. }
212. //get 请求,不需要包体
213. rc = ngx_http_discard_request_body(r);
215. if (rc != NGX_OK)
216. {
217. return rc;
218. }
220. if (ngx_http_set_content_type(r) != NGX_OK)
221. {
222. return NGX_HTTP_INTERNAL_SERVER_ERROR;
223. }
225. if (ngx_http_upstream_create(r) != NGX_OK)
226. {
227. return NGX_HTTP_INTERNAL_SERVER_ERROR;
228. }
230. u = r->upstream;
232. ngx_str_set(&u->schema, "xdrive_rc://");//schema,没发现有什么用,打log貌似会有点用
234. u->output.tag = (ngx_buf_tag_t)&ngx_http_xdrive_rc_module;
236. mlcf = (ngx_http_xdrive_rc_loc_conf_t *)ngx_http_get_module_loc_conf(r, ngx_http_xdrive_rc_module);
238. u->conf = &mlcf->upstream;
240. //设置回调,网上大都只讲这里
241. u->create_request = ngx_http_xdrive_rc_create_request;
242. u->reinit_request = ngx_http_xdrive_rc_reinit_request;
243. u->process_header = ngx_http_xdrive_rc_process_header;
244. u->abort_request = ngx_http_xdrive_rc_abort_request;
245. u->finalize_request = ngx_http_xdrive_rc_finalize_request;
247. //分配context内存
248. ctx = (ngx_http_xdrive_rc_ctx_t *)ngx_palloc(r->pool, sizeof(ngx_http_xdrive_rc_ctx_t));
249. if (ctx == NULL)
250. {
251. return NGX_HTTP_INTERNAL_SERVER_ERROR;
252. }
253. ngx_memzero(ctx, sizeof(ngx_http_xdrive_rc_ctx_t));
255. ctx->request = r;
257. ngx_http_set_ctx(r, ctx, ngx_http_xdrive_rc_module);
259. u->input_filter_init = ngx_http_xdrive_rc_filter_init;
261. /*
262. * 非常关键的设置,后端服务器包体数据到达的时候,upstream 会回调 input_filter,默认的
263. * input_filter 是 ngx_http_upstream_non_buffered_filter(ngx_http_upstream.c:2475),默认
264. * filter 就是收到数据立马发送给client;而因为需求必须将包体缓存起来,所以这里替换成了我们
265. * 的回调函数,函数里面的功能就是: 缓存包体,等待包体接受完毕,解码,然后一次回复给client
266. */
267. u->input_filter = ngx_http_xdrive_rc_filter;
268. u->input_filter_ctx = ctx;
270. u->buffering = 0; //note, no buffering...cause too complicated !!
272. r->main->count++;
274. //不需要包体,直接初始化 upstream 即可,若需要接受包体,只需要
275. //调用ngx_http_read_client_request_body(r, ngx_http_upstream_init);
276. ngx_http_upstream_init(r);
278. return NGX_DONE;
279. }
282. static ngx_int_t
283. ngx_http_xdrive_rc_create_request(ngx_http_request_t *r)
284. {
285. size_t len;
286. ngx_buf_t *b;
287. ngx_chain_t *cl;
288. ngx_http_xdrive_rc_ctx_t *ctx;
289. ngx_http_variable_value_t *vv;
290. ngx_http_xdrive_rc_loc_conf_t *mlcf;
292. mlcf = (ngx_http_xdrive_rc_loc_conf_t *)ngx_http_get_module_loc_conf(r, ngx_http_xdrive_rc_module);
294. //根据配置文件uid index号从变量中获取uid的变量值
295. vv = ngx_http_get_indexed_variable(r, mlcf->uid_index);
297. if (vv == NULL || vv->not_found || vv->len == 0)
298. {
299. ngx_log_error(NGX_LOG_ERR, r->connection->log, 0,
300. "the \"$uid\" variable is not set");
301. return NGX_ERROR;
302. }
304. ctx = (ngx_http_xdrive_rc_ctx_t *)ngx_http_get_module_ctx(r, ngx_http_xdrive_rc_module);
305. ctx->uid = ngx_atoof(vv->data, vv->len);
306. if (ctx->uid == (off_t)NGX_ERROR)
307. {
308. ngx_log_error(NGX_LOG_ERR, r->connection->log, 0,
309. "the \"$uid\" variable is err %s set", vv->data);
310. return NGX_ERROR;
311. }
313. //根据配置文件path index号从变量中获取path的变量值
314. vv = ngx_http_get_indexed_variable(r, mlcf->path_index);
315. if (vv == NULL || vv->not_found || vv->len == 0)
316. {
317. ngx_str_set(&ctx->path, "/");
318. }
319. else {
320. ctx->path.data = vv->data;
321. ctx->path.len = vv->len;
322. }
324. vv = ngx_http_get_indexed_variable(r, mlcf->recusive_index);
325. if (vv == NULL || vv->not_found || vv->len == 0)
326. {
327. ctx->recusive = false;
328. }
329. else {
330. ctx->recusive = ngx_atoi(vv->data, vv->len);
331. }
333. RcUpdateReq list_req;
334. list_req._user_id = ctx->uid;
335. list_req._path.assign((char *)ctx->path.data, (char *)ctx->path.data + ctx->path.len);
336. list_req._recursive = ctx->recusive;
338. static uint32_t seq = ngx_time();
340. //编码,注意这里的变量使用的内存是从pool里面获取的,成功后,会将buf chain返回;
341. //细节见具体代码,不表
342. cl = ngx_datagram_encode(r->pool, r->connection->log, mlcf->upstream.buffer_size,
343. &list_req, ++seq, 0xC01);
344. if (cl == NULL)
345. return NGX_ERROR;
347. //准备发送
348. r->upstream->request_bufs = cl;
350. ngx_log_debug1(NGX_LOG_DEBUG_HTTP, r->connection->log, 0,
351. "http xdrive_rc request uid=\"%d\", path=\"%V\", recur=%d",
352. ctx->uid, &ctx->path, ctx->recusive);
354. return NGX_OK;
355. }
358. static ngx_int_t
359. ngx_http_xdrive_rc_reinit_request(ngx_http_request_t *r)
360. {
361. return NGX_OK;
362. }
364. /*
365. * 读取二进制包体头部
366. */
367. static ngx_int_t
368. ngx_http_xdrive_rc_process_header(ngx_http_request_t *r)
369. {
370. ngx_http_upstream_t *u;
371. ngx_http_xdrive_rc_ctx_t *ctx;
373. u = r->upstream;
375. //因包头固定长度,所以很好判断
376. if (u->buffer.last - u->buffer.pos < NGX_XDRIVE_DATAGRAM_HEADER)
377. return NGX_AGAIN;
379. ctx = (ngx_http_xdrive_rc_ctx_t *)ngx_http_get_module_ctx(r, ngx_http_xdrive_rc_module);
381. ngx_xdrive_datagram_header_t header;
382. //解包头,获取最重要参数 : 包体长度,根据包体长度收包
383. if (ngx_decode_header(u->buffer.pos, NGX_XDRIVE_DATAGRAM_HEADER,
384. &header, r->connection->log) != NGX_OK)
385. {
386. return NGX_HTTP_UPSTREAM_INVALID_HEADER;
387. }
389. //业务代码
390. if (header._type != 0x08C01)
391. {
392. ngx_log_error(NGX_LOG_WARN, r->connection->log, 0,
393. "xdrive_rc ret type not legal = %d", header._type);
395. return NGX_HTTP_UPSTREAM_INVALID_HEADER;
396. }
398. //业务代码
399. if (header._status != 0)
400. {
401. ngx_log_error(NGX_LOG_WARN, r->connection->log, 0,
402. "xdrive_rc ret status not ok in response = %d", header._status);
404. return NGX_HTTP_UPSTREAM_INVALID_HEADER;
405. }
407. //非常关键一句,这句意思是返回client包包体长度不定,必须采用chunk filter;
408. ngx_http_clear_content_length(r);
410. //因upstream不知道该从upstream收取多少包体数据(我们故意没设置包体长度)
411. //所以我们必须自己处理记录剩余包体长度;
412. ctx->rest_length = header._length - NGX_XDRIVE_DATAGRAM_HEADER;
414. u->headers_in.status_n = NGX_HTTP_OK;
415. u->state->status = NGX_HTTP_OK;
417. //包头数据已经处理完毕,必须丢弃;
418. u->buffer.pos += NGX_XDRIVE_DATAGRAM_HEADER;
420. return NGX_OK;
421. }
424. //其实没啥用
425. static ngx_int_t
426. ngx_http_xdrive_rc_filter_init(void *data)
427. {
428. ngx_http_xdrive_rc_ctx_t *ctx = (ngx_http_xdrive_rc_ctx_t *)data;
430. ngx_http_upstream_t *u;
432. u = ctx->request->upstream;
434. return NGX_OK;
435. }
437. /*
438. * 缓存包体,等待包体接受完毕,解码,然后一次回复给client
439. */
440. static ngx_int_t
441. ngx_http_xdrive_rc_filter(void *data, ssize_t bytes)
442. {
443. ngx_http_xdrive_rc_ctx_t *ctx = (ngx_http_xdrive_rc_ctx_t *)data;
445. u_char *last;
446. ngx_buf_t *b;
447. ngx_chain_t *cl, **ll;
448. ngx_http_upstream_t *u;
450. ngx_http_xdrive_rc_loc_conf_t *mlcf;
452. mlcf = (ngx_http_xdrive_rc_loc_conf_t *)
453. ngx_http_get_module_loc_conf(ctx->request, ngx_http_xdrive_rc_module);
455. u = ctx->request->upstream;
456. b = &u->buffer;
458. size_t buff_size = mlcf->upstream.buffer_size;
459. //assert(bytes <= buff_size);
461. ctx->rest_length -= bytes;
463. ngx_log_debug1(NGX_LOG_DEBUG_HTTP, ctx->request->connection->log, 0,
464. "recv resp len=%d, rest-len=%d", bytes, ctx->rest_length);
466. //特殊情况下,如果包体数据很短(和缓冲区长度比),很可能一次就将包体收完了,这时候
467. //直接交互内存即可,不再需要内存拷贝,否则...
468. if (ctx->rest_length == 0 && ctx->body_buff._chain_head == NULL)
469. {
470. cl = ngx_chain_get_free_buf(ctx->request->pool, &u->free_bufs);
471. ctx->body_buff._chain_head = cl;
473. cl->buf->flush = 1;
474. cl->buf->memory = 1;
476. last = b->last;
477. cl->buf->pos = last;
478. b->last += bytes;
479. cl->buf->last = b->last;
480. cl->buf->tag = u->output.tag;
481. }
482. else {
483. //做一次内存拷贝到 body buf 中去
484. if (ngx_chain_write(ctx->request->pool, &u->free_bufs, &ctx->body_buff, buff_size,
485. b->last, bytes) != NGX_OK)
486. return NGX_ERROR;
488. b->last += bytes;
489. }
491. //判断upstream包体是否收完整
492. if (ctx->rest_length > 0)
493. {
494. return NGX_OK;
495. }
497. //包体收完,进行解码
498. RcUpdateResp list_resp;
499. if (ngx_datagram_decode_body(ctx->body_buff._chain_head,
500. ctx->request->connection->log,
501. &list_resp) != NGX_OK)
502. {
503. ngx_log_error(NGX_LOG_ERR, ctx->request->connection->log, 0,
504. "xdrive_rc RcUpdateResp decode failed");
506. return NGX_ERROR;
507. }
509. ngx_log_error(NGX_LOG_NOTICE, ctx->request->connection->log, 0,
510. "xdrive_rc RcUpdateResp list num=%d",
511. list_resp._action_list.size());
513. //内容已经存入 list_resp 中,body buf失去作用,回收到free bufs里面去,刚好下面用
514. ngx_chain_t *busy_bufs = NULL;
515. ngx_chain_update_chains(&u->free_bufs, &busy_bufs, &ctx->body_buff._chain_head, b->tag);
517. //transfer...
518. ngx_chain_pair_t chain_pair;
519. ngx_memzero(&chain_pair, sizeof(chain_pair));
521. //转成 json 格式
522. if (NGX_OK != ngx_chain_sprintf(ctx->request->pool, &u->free_bufs, &chain_pair, buff_size,
523. "uid=%d, path=%V, recusive=%d, week_dcid=\"%s\", used_space=%d, list_num=%d\n",
524. ctx->uid, &ctx->path, ctx->recusive,
525. list_resp._weak_dcid.c_str(),
526. list_resp._used_space,
527. list_resp._action_list.size()
528. ))
529. return NGX_ERROR;
531. //转成 json 格式
532. for (size_t i = 0; i < list_resp._action_list.size(); ++i)
533. {
534. ActionThrft *ac = &list_resp._action_list[i];
535. if (NGX_OK != ngx_chain_sprintf(ctx->request->pool, &u->free_bufs, &chain_pair, buff_size,
536. "[path=\"%s\", node_type=%d, status=%d, gcid=%s, size=%d]\n",
537. ac->m_path.c_str(), ac->m_node_type, ac->m_status,
538. ac->m_gcid.c_str(), ac->m_file_size
539. ))
540. return NGX_ERROR;
541. }
543. //这句非常有意思,标志这是回包最后一个buf,upstraem通过这标志得知后端收据收集处理完毕
544. //关后端连接,回前端包
545. chain_pair._chain_last->buf->last_buf = 1;
547. for (cl = u->out_bufs, ll = &u->out_bufs; cl; cl = cl->next)
548. {
549. ll = &cl->next;
550. }
551. *ll = chain_pair._chain_head;
553. return NGX_OK;
554. }
557. static void
558. ngx_http_xdrive_rc_abort_request(ngx_http_request_t *r)
559. {
560. ngx_log_debug0(NGX_LOG_DEBUG_HTTP, r->connection->log, 0,
561. "abort http xdrive_rc request");
562. return;
563. }
566. static void
567. ngx_http_xdrive_rc_finalize_request(ngx_http_request_t *r, ngx_int_t rc)
568. {
569. ngx_log_debug0(NGX_LOG_DEBUG_HTTP, r->connection->log, 0,
570. "finalize http xdrive_rc request");
571. return;
572. }
575. static void *
576. ngx_http_xdrive_rc_create_loc_conf(ngx_conf_t *cf)
577. {
578. ngx_http_xdrive_rc_loc_conf_t *conf;
580. conf = (ngx_http_xdrive_rc_loc_conf_t *)ngx_pcalloc(cf->pool,
581. sizeof(ngx_http_xdrive_rc_loc_conf_t));
582. if (conf == NULL)
583. {
584. return NULL;
585. }
587. conf->upstream.connect_timeout = NGX_CONF_UNSET_MSEC;
588. conf->upstream.send_timeout = NGX_CONF_UNSET_MSEC;
589. conf->upstream.read_timeout = NGX_CONF_UNSET_MSEC;
591. conf->upstream.buffer_size = NGX_CONF_UNSET_SIZE;
593. /* the hardcoded values */
594. conf->upstream.cyclic_temp_file = 0;
595. conf->upstream.buffering = 0;
596. conf->upstream.ignore_client_abort = 0;
597. conf->upstream.send_lowat = 0;
598. conf->upstream.bufs.num = 0;
599. conf->upstream.busy_buffers_size = 0;
600. conf->upstream.max_temp_file_size = 0;
601. conf->upstream.temp_file_write_size = 0;
602. conf->upstream.intercept_errors = 1;
603. conf->upstream.intercept_404 = 1;
604. conf->upstream.pass_request_headers = 0;
605. conf->upstream.pass_request_body = 0;
607. conf->uid_index = NGX_CONF_UNSET;
608. conf->path_index = NGX_CONF_UNSET;
609. conf->recusive_index = NGX_CONF_UNSET;
611. return conf;
612. }
615. static char *
616. ngx_http_xdrive_rc_merge_loc_conf(ngx_conf_t *cf, void *parent, void *child)
617. {
618. ngx_http_xdrive_rc_loc_conf_t *prev = (ngx_http_xdrive_rc_loc_conf_t *)parent;
619. ngx_http_xdrive_rc_loc_conf_t *conf = (ngx_http_xdrive_rc_loc_conf_t *)child;
621. ngx_conf_merge_msec_value(conf->upstream.connect_timeout,
622. prev->upstream.connect_timeout, 60000);
624. ngx_conf_merge_msec_value(conf->upstream.send_timeout,
625. prev->upstream.send_timeout, 60000);
627. ngx_conf_merge_msec_value(conf->upstream.read_timeout,
628. prev->upstream.read_timeout, 60000);
630. ngx_conf_merge_size_value(conf->upstream.buffer_size,
631. prev->upstream.buffer_size,
632. (size_t)ngx_pagesize);
634. ngx_conf_merge_bitmask_value(conf->upstream.next_upstream,
635. prev->upstream.next_upstream,
636. (NGX_CONF_BITMASK_SET
637. | NGX_HTTP_UPSTREAM_FT_ERROR
638. | NGX_HTTP_UPSTREAM_FT_TIMEOUT));
640. if (conf->upstream.next_upstream & NGX_HTTP_UPSTREAM_FT_OFF)
641. {
642. conf->upstream.next_upstream = NGX_CONF_BITMASK_SET
643. | NGX_HTTP_UPSTREAM_FT_OFF;
644. }
646. if (conf->upstream.upstream == NULL)
647. {
648. conf->upstream.upstream = prev->upstream.upstream;
649. }
651. if (conf->uid_index == NGX_CONF_UNSET) {
652. conf->uid_index = prev->uid_index;
653. }
654. if (conf->path_index == NGX_CONF_UNSET) {
655. conf->path_index = prev->path_index;
656. }
657. if (conf->recusive_index == NGX_CONF_UNSET) {
658. conf->recusive_index = prev->recusive_index;
659. }
661. return NGX_CONF_OK;
662. }
665. static char *
666. ngx_http_xdrive_rc_pass(ngx_conf_t *cf, ngx_command_t *cmd, void *conf)
667. {
668. ngx_http_xdrive_rc_loc_conf_t *mlcf = (ngx_http_xdrive_rc_loc_conf_t *)conf;
670. ngx_str_t *value;
671. ngx_url_t u;
672. ngx_http_core_loc_conf_t *clcf;
674. if (mlcf->upstream.upstream)
675. {
676. return "is duplicate";
677. }
679. value = (ngx_str_t *)cf->args->elts;
681. ngx_memzero(&u, sizeof(ngx_url_t));
683. u.url = value[1];
684. u.no_resolve = 1;
686. mlcf->upstream.upstream = ngx_http_upstream_add(cf, &u, 0);
687. if (mlcf->upstream.upstream == NULL)
688. {
689. return (char *)(NGX_CONF_ERROR);
690. }
692. clcf = (ngx_http_core_loc_conf_t *)ngx_http_conf_get_module_loc_conf(cf, ngx_http_core_module);
694. clcf->handler = ngx_http_xdrive_rc_handler;
696. if (clcf->name.data[clcf->name.len - 1] == '/')
697. {
698. clcf->auto_redirect = 1;
699. }
701. //保存变量index用
702. mlcf->uid_index = ngx_http_get_variable_index(cf, &ngx_http_proxy_vars[0].name);
703. if (mlcf->uid_index == NGX_ERROR)
704. {
705. return (char *)(NGX_CONF_ERROR);
706. }
707. mlcf->path_index = ngx_http_get_variable_index(cf, &ngx_http_proxy_vars[1].name);
708. if (mlcf->path_index == NGX_ERROR)
709. {
710. return (char *)(NGX_CONF_ERROR);
711. }
712. mlcf->recusive_index = ngx_http_get_variable_index(cf, &ngx_http_proxy_vars[2].name);
713. if (mlcf->recusive_index == NGX_ERROR)
714. {
715. return (char *)(NGX_CONF_ERROR);
716. }
718. return NGX_CONF_OK;
719. }
723. static ngx_int_t
724. ngx_http_xdrive_rc_add_variables(ngx_conf_t *cf)
725. {
726. ngx_http_variable_t *var, *v;
728. for (v = ngx_http_proxy_vars; v->name.len; v++)
729. {
730. var = ngx_http_add_variable(cf, &v->name, v->flags);
731. if (var == NULL)
732. {
733. return NGX_ERROR;
734. }
736. var->get_handler = v->get_handler;
737. var->data = v->data;
738. }
740. return NGX_OK;
741. }
代码中一些有意思的地方:
2. //和buf差不多的思想的 buf chain
3. typedef struct
4. {
5. ngx_chain_t* _chain_head;
6. ngx_chain_t* _chain_pos;
7. ngx_chain_t* _chain_last;
8. ngx_chain_t* _chain_tail;
9. }
10. ngx_chain_pair_t;
12. //从buf chain中读取len长内存出来
13. size_t ngx_cdecl
14. ngx_chain_read(ngx_chain_pair_t* chain_pair
15. , uint8_t *buf, uint32_t len);
//将buf写入到buf chain中
1. ngx_int_t ngx_cdecl
2. ngx_chain_write(ngx_pool_t* pool
3. , ngx_chain_t** free_bufs
4. , ngx_chain_pair_t* chain_pair
5. , size_t write_chunk_size
6. , const uint8_t *buf, uint32_t len);
//写json或者xml之类回复有用
1. ngx_int_t ngx_cdecl
2. ngx_chain_sprintf(ngx_pool_t *pool
3. , ngx_chain_t **free_bufs
4. , ngx_chain_pair_t *chain_pair
5. , size_t write_chunk_size
6. , const char *fmt, ...);
下面是nginx配置文件中的关键部分
1. location ~* /rc_list/([0-9]+).html$ {
2. xdrive_rc_buffer_size 4096;
3. set $uid $1;
4. set $path /;
5. set $recusive 0;
6. if ($query_string ~* (|&)recusive=(0|1)(|&)) {
7. set $recusive $2;
8. }
9. xdrive_rc_pass 127.0.0.1:11001;
10. }
- 解释下上面配置文件意思,将url中匹配的用户数值放入uid参数,根据后缀参数判断是否递归将值放入
- recusive 参数中;扩展模块将从这三个参数中将需要的值提取出来;
- 思路来源于:ngx_http_memcached_module.c 模块,应该还有其他的各种各样的实现方式,不知道还有没有更简单明了的途径;
网友评论