美文网首页
nginx 自定义协议 扩展模块开发

nginx 自定义协议 扩展模块开发

作者: 达微 | 来源:发表于2019-07-04 10:04 被阅读0次

公司内部协议均是固定包长的二进制协议,对于内部服务器通信来说足够了,但接口服务器还是采用了http协议,毕竟通用,况且私有二进制协议对外非常不好友,更何况还易遭防火墙拦截;写一个通用且配置功能强大的http server是比较困难的。项目组写的http框架非常难用,仅仅达到能用而已,效率低下,不灵活等等;
在接触了nginx后,被其能扩展的特性深深吸引了,于是尝试为项目组的框架写一个能一个扩展模块,需求蛮明确的:就是将http协议转成服务器内部的二进制协议;
在网上找资料,资料比较稀少,大多是一个简单的hello world例子,比较少参考性;《Emiller的Nginx模块开发心得.pdf》相对而言是一个完善的文档;但看了之后还是感觉一头雾水,不甚明了;最好的文档就是代码,下载了 nginx-1.0.8 源码;source insight 建项目,看代码,析流程;渐渐nginx流程在脑海中明晰起来;
看代码熟悉nginx花3天时间;着手写代码到代码完成1天半,测试休bug到完成目标需求花费1天,为了写这个扩展,把整个周末都搭进去了,晚上还熬了下夜,最后看着内部服务器的数据通过扩展模块中介到nginx输出,还是有点小成就感的;
终极目标:做个车联网网关


11(50).png

[图片上传中...(11(50).png-3a0a70-1562205821153-0)]

xdrive.rar ](http://blog.chinaunix.net/attachment/attach/26/44/39/21264439210d08ecda94a90e0ccf70f678244cb508.rar)

  注:因代码中夹杂了些公司项目的业务,这些代码在protocal文件夹下,被我从压缩包中剔除了,但绝对不影响代码整个流程完整性;

  nginx 只支持c代码,扩展模块中加入了不少c++代码,也懒得去搞其他方法了,直接修改了 auto/make 文件,改动如下:
1.  CPP = g++
2.  LINK = \$(CPP) ##采用g++来链接

1.  ##line=338 below was changed by kevin_zhong on 2011-11-14 

3.  ngx_obj=`echo $ngx_obj \

4.  | sed -e "s#^\(.*\.\)cpp\\$#$ngx_objs_dir\1$ngx_objext#g" \

5.  -e "s#^\(.*\.\)cc\\$#$ngx_objs_dir\1$ngx_objext#g" \

6.  -e "s#^\(.*\.\)c\\$#$ngx_objs_dir\1$ngx_objext#g" \

7.  -e "s#^\(.*\.\)S\\$#$ngx_objs_dir\1$ngx_objext#g"`

9.  ngx_post_suffix=`echo $ngx_src \

10.  | sed -e "s#^.*\(\.c\)\\$#\1#g" \

11.  -e "s#^.*\(\.cc\)\\$#\1#g" \

12.  -e "s#^.*\(\.cpp\)\\$#\1#g"`

14.  if [ "$ngx_post_suffix"x = ".cpp"x ];then

15.  ngx_cc="\$(CPP) $ngx_compile_opt \$(CFLAGS) $ngx_use_pch \$(ALL_INCS) $ADDON_INCS"

16.  else

17.  ngx_cc="\$(CC) $ngx_compile_opt \$(CFLAGS) $ngx_use_pch \$(ALL_INCS) $ADDON_INCS"

18.  fi

上面的脚本是判断源代码后缀,如果是c++则生成makefile语句采用g++,否则采用gcc;

下面是具体代码分析:

1.  /* 
2.  * Copyright (C) Igor Sysoev; kevin_zhong

3.  * mail: qq2000zhong@gmail.com

4.  * date: 2011-11-13

5.  */

7.  //因是cpp文件,固包含c头文件需要 extern c

8.  extern "C" {

9.  #include <ngx_config.h>

10.  #include <ngx_core.h>

11.  #include <ngx_http.h>

12.  #include "ngx_chain_util.h"

13.  }

15.  //与服务器内部通信二进制协议实现

16.  #include "ngx_thrift_transport.h"

17.  #include "ngx_xdrive_datagram.h"

18.  #include "protocal/rc_updator_types.h"

20.  using namespace xdrive::msg::rc_updator;

21.  using namespace xdrive;

23.  /*

24.  * 扩展模块需要3个业务相关输入变量,uid,path,recusive

25.  * 参考nginx.conf中的配置写法

26.  */

28.  typedef struct

29.  {

30.  ngx_http_upstream_conf_t upstream;

32.  //将uid和path以及recusive在配置中的index找出来,以后create request的时候需要

33.  ngx_int_t uid_index;

34.  ngx_int_t path_index;

35.  ngx_int_t recusive_index;

36.  }

37.  ngx_http_xdrive_rc_loc_conf_t;

39.  /*

40.  * 注明下,这个模块和网上诸多模块以及nginx特有模块差别最大的地方是:

41.  *

42.  * 1, 因为项目组的二进制协议不是流式协议,即必须将数据包全部收完整后,

43.  * 才能调用decode解码,所以不能像其他模块那样采用流,即不能一边接

44.  * 受数据,一边发送数据;只能先将数据全部缓存起来,等到收集到完整的resp包,

45.  * 再一次性解码,然后再转换成 json 类格式一次性输出,这是这类协议最大最明显的缺点;

46.  *

47.  * 2,虽然从后端server收到的resp content length是确定的,但经过转换(从二进制到json类)

48.  * 后,content len 已经变得不相等,且很不好计算;所以只能采用 chunk 方式返回给client

49.  *

50.  * 3,网上有的,或者<Emiller的Nginx模块开发心得.pdf>中有的,都不提,参考即可;

51.  */

53.  typedef struct

54.  {

55.  ngx_http_request_t *request;

56.  ngx_chain_pair_t body_buff;

57.  ngx_chain_t * tail_buff;

58.  uint64_t uid;

59.  ngx_str_t path;

60.  bool recusive;

62.  //后端剩余接受包体长度,即还有多少个字节等待从后端读取,本来不需要这个length的

63.  //开始代码是存储 r.out_headers.content_len_n,u->length = r.out_headers.content_len_n

64.  //upstream通过u->length==0判断后端数据是否接受完毕,但这样client回复包将得到一个不正确

65.  //的 content len,导致接受http包体数据异常...

66.  //参考 ngx_http_upstream.c:2391

67.  int rest_length;

68.  }

69.  ngx_http_xdrive_rc_ctx_t;

72.  static ngx_int_t ngx_http_xdrive_rc_add_variables(ngx_conf_t *cf);

73.  static ngx_int_t ngx_http_xdrive_rc_create_request(ngx_http_request_t *r);

74.  static ngx_int_t ngx_http_xdrive_rc_reinit_request(ngx_http_request_t *r);

75.  static ngx_int_t ngx_http_xdrive_rc_process_header(ngx_http_request_t *r);

76.  static ngx_int_t ngx_http_xdrive_rc_filter_init(void *data);

77.  static ngx_int_t ngx_http_xdrive_rc_filter(void *data, ssize_t bytes);

78.  static void ngx_http_xdrive_rc_abort_request(ngx_http_request_t *r);

79.  static void ngx_http_xdrive_rc_finalize_request(ngx_http_request_t *r, ngx_int_t rc);

81.  static void *ngx_http_xdrive_rc_create_loc_conf(ngx_conf_t *cf);

82.  static char *ngx_http_xdrive_rc_merge_loc_conf(ngx_conf_t *cf, void *parent, void *child);

84.  static char *ngx_http_xdrive_rc_pass(ngx_conf_t *cf, ngx_command_t *cmd, void *conf);

87.  static ngx_conf_bitmask_t ngx_http_xdrive_rc_next_upstream_masks[] = {

88.  { ngx_string("error"), NGX_HTTP_UPSTREAM_FT_ERROR },

89.  { ngx_string("timeout"), NGX_HTTP_UPSTREAM_FT_TIMEOUT },

90.  { ngx_string("invalid_header"), NGX_HTTP_UPSTREAM_FT_INVALID_HEADER },

91.  { ngx_string("not_found"), NGX_HTTP_UPSTREAM_FT_HTTP_404 },

92.  { ngx_string("off"), NGX_HTTP_UPSTREAM_FT_OFF },

93.  { ngx_null_string, 0 }

94.  };

96.  /*

97.  * 参数设置,不可变,注意和变量的区别

98.  */

99.  static ngx_command_t ngx_http_xdrive_rc_commands[] = {

100.  { ngx_string("xdrive_rc_pass"),

101.  NGX_HTTP_LOC_CONF | NGX_HTTP_LIF_CONF | NGX_CONF_TAKE1,

102.  ngx_http_xdrive_rc_pass,

103.  NGX_HTTP_LOC_CONF_OFFSET,

104.  0,

105.  NULL },

107.  { ngx_string("xdrive_rc_connect_timeout"),

108.  NGX_HTTP_MAIN_CONF | NGX_HTTP_SRV_CONF | NGX_HTTP_LOC_CONF | NGX_CONF_TAKE1,

109.  ngx_conf_set_msec_slot,

110.  NGX_HTTP_LOC_CONF_OFFSET,

111.  offsetof(ngx_http_xdrive_rc_loc_conf_t, upstream.connect_timeout),

112.  NULL },

114.  { ngx_string("xdrive_rc_send_timeout"),

115.  NGX_HTTP_MAIN_CONF | NGX_HTTP_SRV_CONF | NGX_HTTP_LOC_CONF | NGX_CONF_TAKE1,

116.  ngx_conf_set_msec_slot,

117.  NGX_HTTP_LOC_CONF_OFFSET,

118.  offsetof(ngx_http_xdrive_rc_loc_conf_t, upstream.send_timeout),

119.  NULL },

121.  { ngx_string("xdrive_rc_buffer_size"),

122.  NGX_HTTP_MAIN_CONF | NGX_HTTP_SRV_CONF | NGX_HTTP_LOC_CONF | NGX_CONF_TAKE1,

123.  ngx_conf_set_size_slot,

124.  NGX_HTTP_LOC_CONF_OFFSET,

125.  offsetof(ngx_http_xdrive_rc_loc_conf_t, upstream.buffer_size),

126.  NULL },

128.  { ngx_string("xdrive_rc_read_timeout"),

129.  NGX_HTTP_MAIN_CONF | NGX_HTTP_SRV_CONF | NGX_HTTP_LOC_CONF | NGX_CONF_TAKE1,

130.  ngx_conf_set_msec_slot,

131.  NGX_HTTP_LOC_CONF_OFFSET,

132.  offsetof(ngx_http_xdrive_rc_loc_conf_t, upstream.read_timeout),

133.  NULL },

135.  { ngx_string("xdrive_rc_next_upstream"),

136.  NGX_HTTP_MAIN_CONF | NGX_HTTP_SRV_CONF | NGX_HTTP_LOC_CONF | NGX_CONF_1MORE,

137.  ngx_conf_set_bitmask_slot,

138.  NGX_HTTP_LOC_CONF_OFFSET,

139.  offsetof(ngx_http_xdrive_rc_loc_conf_t, upstream.next_upstream),

140.  &ngx_http_xdrive_rc_next_upstream_masks },

142.  ngx_null_command

143.  };

146.  static ngx_http_module_t ngx_http_xdrive_rc_module_ctx = {

147.  ngx_http_xdrive_rc_add_variables, /* preconfiguration */

148.  NULL, /* postconfiguration */

150.  NULL, /* create main configuration */

151.  NULL, /* init main configuration */

153.  NULL, /* create server configuration */

154.  NULL, /* merge server configuration */

156.  ngx_http_xdrive_rc_create_loc_conf, /* create location configration */

157.  ngx_http_xdrive_rc_merge_loc_conf /* merge location configration */

158.  };

161.  ngx_module_t ngx_http_xdrive_rc_module = {

162.  NGX_MODULE_V1,

163.  &ngx_http_xdrive_rc_module_ctx, /* module context */

164.  ngx_http_xdrive_rc_commands, /* module directives */

165.  NGX_HTTP_MODULE, /* module type */

166.  NULL, /* init master */

167.  NULL, /* init module */

168.  NULL, /* init process */

169.  NULL, /* init thread */

170.  NULL, /* exit thread */

171.  NULL, /* exit process */

172.  NULL, /* exit master */

173.  NGX_MODULE_V1_PADDING

174.  };

176.  //业务相关变量,get_handler = NULL,因为这三个是从conf里面通过

177.  //正则匹配得到的,为什么不直接通过 get handler 从http requeset里面获取了

178.  //因为这样更灵活,conf可以随时改,比如现在 uid 是从 url 里面获取,但如果

179.  //业务需要uid放在 query_string,这时候就只需要改配置即可了

180.  //思路来源于 ngx_http_memcached_module.c

182.  static ngx_http_variable_t ngx_http_proxy_vars[] = {

183.  { ngx_string("uid"), NULL,

184.  NULL, 0,

185.  NGX_HTTP_VAR_CHANGEABLE | NGX_HTTP_VAR_NOCACHEABLE | NGX_HTTP_VAR_NOHASH,

186.  0 },

187.  { ngx_string("path"), NULL,

188.  NULL, 0,

189.  NGX_HTTP_VAR_CHANGEABLE | NGX_HTTP_VAR_NOCACHEABLE | NGX_HTTP_VAR_NOHASH,

190.  0 },

191.  { ngx_string("recusive"), NULL,

192.  NULL, 0,

193.  NGX_HTTP_VAR_CHANGEABLE | NGX_HTTP_VAR_NOCACHEABLE | NGX_HTTP_VAR_NOHASH,

194.  0 },

195.  { ngx_null_string, NULL,NULL,0, 0, 0 }

196.  };

199.  static ngx_int_t

200.  ngx_http_xdrive_rc_handler(ngx_http_request_t *r)

201.  {

202.  ngx_int_t rc;

203.  ngx_http_upstream_t *u;

204.  ngx_http_xdrive_rc_ctx_t *ctx;

205.  ngx_http_xdrive_rc_loc_conf_t *mlcf;

207.  if (!(r->method & (NGX_HTTP_GET | NGX_HTTP_HEAD)))

208.  {

209.  return NGX_HTTP_NOT_ALLOWED;

210.  }

212.  //get 请求,不需要包体

213.  rc = ngx_http_discard_request_body(r);

215.  if (rc != NGX_OK)

216.  {

217.  return rc;

218.  }

220.  if (ngx_http_set_content_type(r) != NGX_OK)

221.  {

222.  return NGX_HTTP_INTERNAL_SERVER_ERROR;

223.  }

225.  if (ngx_http_upstream_create(r) != NGX_OK)

226.  {

227.  return NGX_HTTP_INTERNAL_SERVER_ERROR;

228.  }

230.  u = r->upstream;

232.  ngx_str_set(&u->schema, "xdrive_rc://");//schema,没发现有什么用,打log貌似会有点用

234.  u->output.tag = (ngx_buf_tag_t)&ngx_http_xdrive_rc_module;

236.  mlcf = (ngx_http_xdrive_rc_loc_conf_t *)ngx_http_get_module_loc_conf(r, ngx_http_xdrive_rc_module);

238.  u->conf = &mlcf->upstream;

240.  //设置回调,网上大都只讲这里

241.  u->create_request = ngx_http_xdrive_rc_create_request;

242.  u->reinit_request = ngx_http_xdrive_rc_reinit_request;

243.  u->process_header = ngx_http_xdrive_rc_process_header;

244.  u->abort_request = ngx_http_xdrive_rc_abort_request;

245.  u->finalize_request = ngx_http_xdrive_rc_finalize_request;

247.  //分配context内存

248.  ctx = (ngx_http_xdrive_rc_ctx_t *)ngx_palloc(r->pool, sizeof(ngx_http_xdrive_rc_ctx_t));

249.  if (ctx == NULL)

250.  {

251.  return NGX_HTTP_INTERNAL_SERVER_ERROR;

252.  }

253.  ngx_memzero(ctx, sizeof(ngx_http_xdrive_rc_ctx_t));

255.  ctx->request = r;

257.  ngx_http_set_ctx(r, ctx, ngx_http_xdrive_rc_module);

259.  u->input_filter_init = ngx_http_xdrive_rc_filter_init;

261.  /*

262.  * 非常关键的设置,后端服务器包体数据到达的时候,upstream 会回调 input_filter,默认的

263.  * input_filter 是 ngx_http_upstream_non_buffered_filter(ngx_http_upstream.c:2475),默认

264.  * filter 就是收到数据立马发送给client;而因为需求必须将包体缓存起来,所以这里替换成了我们

265.  * 的回调函数,函数里面的功能就是: 缓存包体,等待包体接受完毕,解码,然后一次回复给client

266.  */

267.  u->input_filter = ngx_http_xdrive_rc_filter;

268.  u->input_filter_ctx = ctx;

270.  u->buffering = 0; //note, no buffering...cause too complicated !!

272.  r->main->count++;

274.  //不需要包体,直接初始化 upstream 即可,若需要接受包体,只需要

275.  //调用ngx_http_read_client_request_body(r, ngx_http_upstream_init);

276.  ngx_http_upstream_init(r);

278.  return NGX_DONE;

279.  }

282.  static ngx_int_t

283.  ngx_http_xdrive_rc_create_request(ngx_http_request_t *r)

284.  {

285.  size_t len;

286.  ngx_buf_t *b;

287.  ngx_chain_t *cl;

288.  ngx_http_xdrive_rc_ctx_t *ctx;

289.  ngx_http_variable_value_t *vv;

290.  ngx_http_xdrive_rc_loc_conf_t *mlcf;

292.  mlcf = (ngx_http_xdrive_rc_loc_conf_t *)ngx_http_get_module_loc_conf(r, ngx_http_xdrive_rc_module);

294.  //根据配置文件uid index号从变量中获取uid的变量值

295.  vv = ngx_http_get_indexed_variable(r, mlcf->uid_index);

297.  if (vv == NULL || vv->not_found || vv->len == 0)

298.  {

299.  ngx_log_error(NGX_LOG_ERR, r->connection->log, 0,

300.  "the \"$uid\" variable is not set");

301.  return NGX_ERROR;

302.  }

304.  ctx = (ngx_http_xdrive_rc_ctx_t *)ngx_http_get_module_ctx(r, ngx_http_xdrive_rc_module);

305.  ctx->uid = ngx_atoof(vv->data, vv->len);

306.  if (ctx->uid == (off_t)NGX_ERROR)

307.  {

308.  ngx_log_error(NGX_LOG_ERR, r->connection->log, 0,

309.  "the \"$uid\" variable is err %s set", vv->data);

310.  return NGX_ERROR;

311.  }

313.  //根据配置文件path index号从变量中获取path的变量值

314.  vv = ngx_http_get_indexed_variable(r, mlcf->path_index);

315.  if (vv == NULL || vv->not_found || vv->len == 0)

316.  {

317.  ngx_str_set(&ctx->path, "/");

318.  }

319.  else {

320.  ctx->path.data = vv->data;

321.  ctx->path.len = vv->len;

322.  }

324.  vv = ngx_http_get_indexed_variable(r, mlcf->recusive_index);

325.  if (vv == NULL || vv->not_found || vv->len == 0)

326.  {

327.  ctx->recusive = false;

328.  }

329.  else {

330.  ctx->recusive = ngx_atoi(vv->data, vv->len);

331.  }

333.  RcUpdateReq list_req;

334.  list_req._user_id = ctx->uid;

335.  list_req._path.assign((char *)ctx->path.data, (char *)ctx->path.data + ctx->path.len);

336.  list_req._recursive = ctx->recusive;

338.  static uint32_t seq = ngx_time();

340.  //编码,注意这里的变量使用的内存是从pool里面获取的,成功后,会将buf chain返回;

341.  //细节见具体代码,不表

342.  cl = ngx_datagram_encode(r->pool, r->connection->log, mlcf->upstream.buffer_size,

343.  &list_req, ++seq, 0xC01);

344.  if (cl == NULL)

345.  return NGX_ERROR;

347.  //准备发送

348.  r->upstream->request_bufs = cl;

350.  ngx_log_debug1(NGX_LOG_DEBUG_HTTP, r->connection->log, 0,

351.  "http xdrive_rc request uid=\"%d\", path=\"%V\", recur=%d",

352.  ctx->uid, &ctx->path, ctx->recusive);

354.  return NGX_OK;

355.  }

358.  static ngx_int_t

359.  ngx_http_xdrive_rc_reinit_request(ngx_http_request_t *r)

360.  {

361.  return NGX_OK;

362.  }

364.  /*

365.  * 读取二进制包体头部

366.  */

367.  static ngx_int_t

368.  ngx_http_xdrive_rc_process_header(ngx_http_request_t *r)

369.  {

370.  ngx_http_upstream_t *u;

371.  ngx_http_xdrive_rc_ctx_t *ctx;

373.  u = r->upstream;

375.  //因包头固定长度,所以很好判断

376.  if (u->buffer.last - u->buffer.pos < NGX_XDRIVE_DATAGRAM_HEADER)

377.  return NGX_AGAIN;

379.  ctx = (ngx_http_xdrive_rc_ctx_t *)ngx_http_get_module_ctx(r, ngx_http_xdrive_rc_module);

381.  ngx_xdrive_datagram_header_t header;

382.  //解包头,获取最重要参数 : 包体长度,根据包体长度收包

383.  if (ngx_decode_header(u->buffer.pos, NGX_XDRIVE_DATAGRAM_HEADER,

384.  &header, r->connection->log) != NGX_OK)

385.  {

386.  return NGX_HTTP_UPSTREAM_INVALID_HEADER;

387.  }

389.  //业务代码

390.  if (header._type != 0x08C01)

391.  {

392.  ngx_log_error(NGX_LOG_WARN, r->connection->log, 0,

393.  "xdrive_rc ret type not legal = %d", header._type);

395.  return NGX_HTTP_UPSTREAM_INVALID_HEADER;

396.  }

398.  //业务代码

399.  if (header._status != 0)

400.  {

401.  ngx_log_error(NGX_LOG_WARN, r->connection->log, 0,

402.  "xdrive_rc ret status not ok in response = %d", header._status);

404.  return NGX_HTTP_UPSTREAM_INVALID_HEADER;

405.  }

407.  //非常关键一句,这句意思是返回client包包体长度不定,必须采用chunk filter;

408.  ngx_http_clear_content_length(r);

410.  //因upstream不知道该从upstream收取多少包体数据(我们故意没设置包体长度)

411.  //所以我们必须自己处理记录剩余包体长度;

412.  ctx->rest_length = header._length - NGX_XDRIVE_DATAGRAM_HEADER;

414.  u->headers_in.status_n = NGX_HTTP_OK;

415.  u->state->status = NGX_HTTP_OK;

417.  //包头数据已经处理完毕,必须丢弃;

418.  u->buffer.pos += NGX_XDRIVE_DATAGRAM_HEADER;

420.  return NGX_OK;

421.  }

424.  //其实没啥用

425.  static ngx_int_t

426.  ngx_http_xdrive_rc_filter_init(void *data)

427.  {

428.  ngx_http_xdrive_rc_ctx_t *ctx = (ngx_http_xdrive_rc_ctx_t *)data;

430.  ngx_http_upstream_t *u;

432.  u = ctx->request->upstream;

434.  return NGX_OK;

435.  }

437.  /*

438.  * 缓存包体,等待包体接受完毕,解码,然后一次回复给client

439.  */

440.  static ngx_int_t

441.  ngx_http_xdrive_rc_filter(void *data, ssize_t bytes)

442.  {

443.  ngx_http_xdrive_rc_ctx_t *ctx = (ngx_http_xdrive_rc_ctx_t *)data;

445.  u_char *last;

446.  ngx_buf_t *b;

447.  ngx_chain_t *cl, **ll;

448.  ngx_http_upstream_t *u;

450.  ngx_http_xdrive_rc_loc_conf_t *mlcf;

452.  mlcf = (ngx_http_xdrive_rc_loc_conf_t *)

453.  ngx_http_get_module_loc_conf(ctx->request, ngx_http_xdrive_rc_module);

455.  u = ctx->request->upstream;

456.  b = &u->buffer;

458.  size_t buff_size = mlcf->upstream.buffer_size;

459.  //assert(bytes <= buff_size);

461.  ctx->rest_length -= bytes;

463.  ngx_log_debug1(NGX_LOG_DEBUG_HTTP, ctx->request->connection->log, 0,

464.  "recv resp len=%d, rest-len=%d", bytes, ctx->rest_length);

466.  //特殊情况下,如果包体数据很短(和缓冲区长度比),很可能一次就将包体收完了,这时候

467.  //直接交互内存即可,不再需要内存拷贝,否则...

468.  if (ctx->rest_length == 0 && ctx->body_buff._chain_head == NULL)

469.  {

470.  cl = ngx_chain_get_free_buf(ctx->request->pool, &u->free_bufs);

471.  ctx->body_buff._chain_head = cl;

473.  cl->buf->flush = 1;

474.  cl->buf->memory = 1;

476.  last = b->last;

477.  cl->buf->pos = last;

478.  b->last += bytes;

479.  cl->buf->last = b->last;

480.  cl->buf->tag = u->output.tag;

481.  }

482.  else {

483.  //做一次内存拷贝到 body buf 中去

484.  if (ngx_chain_write(ctx->request->pool, &u->free_bufs, &ctx->body_buff, buff_size,

485.  b->last, bytes) != NGX_OK)

486.  return NGX_ERROR;

488.  b->last += bytes;

489.  }

491.  //判断upstream包体是否收完整

492.  if (ctx->rest_length > 0)

493.  {

494.  return NGX_OK;

495.  }

497.  //包体收完,进行解码

498.  RcUpdateResp list_resp;

499.  if (ngx_datagram_decode_body(ctx->body_buff._chain_head,

500.  ctx->request->connection->log,

501.  &list_resp) != NGX_OK)

502.  {

503.  ngx_log_error(NGX_LOG_ERR, ctx->request->connection->log, 0,

504.  "xdrive_rc RcUpdateResp decode failed");

506.  return NGX_ERROR;

507.  }

509.  ngx_log_error(NGX_LOG_NOTICE, ctx->request->connection->log, 0,

510.  "xdrive_rc RcUpdateResp list num=%d",

511.  list_resp._action_list.size());

513.  //内容已经存入 list_resp 中,body buf失去作用,回收到free bufs里面去,刚好下面用

514.  ngx_chain_t *busy_bufs = NULL;

515.  ngx_chain_update_chains(&u->free_bufs, &busy_bufs, &ctx->body_buff._chain_head, b->tag);

517.  //transfer...

518.  ngx_chain_pair_t chain_pair;

519.  ngx_memzero(&chain_pair, sizeof(chain_pair));

521.  //转成 json 格式

522.  if (NGX_OK != ngx_chain_sprintf(ctx->request->pool, &u->free_bufs, &chain_pair, buff_size,

523.  "uid=%d, path=%V, recusive=%d, week_dcid=\"%s\", used_space=%d, list_num=%d\n",

524.  ctx->uid, &ctx->path, ctx->recusive,

525.  list_resp._weak_dcid.c_str(),

526.  list_resp._used_space,

527.  list_resp._action_list.size()

528.  ))

529.  return NGX_ERROR;

531.  //转成 json 格式

532.  for (size_t i = 0; i < list_resp._action_list.size(); ++i)

533.  {

534.  ActionThrft *ac = &list_resp._action_list[i];

535.  if (NGX_OK != ngx_chain_sprintf(ctx->request->pool, &u->free_bufs, &chain_pair, buff_size,

536.  "[path=\"%s\", node_type=%d, status=%d, gcid=%s, size=%d]\n",

537.  ac->m_path.c_str(), ac->m_node_type, ac->m_status,

538.  ac->m_gcid.c_str(), ac->m_file_size

539.  ))

540.  return NGX_ERROR;

541.  }

543.  //这句非常有意思,标志这是回包最后一个buf,upstraem通过这标志得知后端收据收集处理完毕

544.  //关后端连接,回前端包

545.  chain_pair._chain_last->buf->last_buf = 1;

547.  for (cl = u->out_bufs, ll = &u->out_bufs; cl; cl = cl->next)

548.  {

549.  ll = &cl->next;

550.  }

551.  *ll = chain_pair._chain_head;

553.  return NGX_OK;

554.  }

557.  static void

558.  ngx_http_xdrive_rc_abort_request(ngx_http_request_t *r)

559.  {

560.  ngx_log_debug0(NGX_LOG_DEBUG_HTTP, r->connection->log, 0,

561.  "abort http xdrive_rc request");

562.  return;

563.  }

566.  static void

567.  ngx_http_xdrive_rc_finalize_request(ngx_http_request_t *r, ngx_int_t rc)

568.  {

569.  ngx_log_debug0(NGX_LOG_DEBUG_HTTP, r->connection->log, 0,

570.  "finalize http xdrive_rc request");

571.  return;

572.  }

575.  static void *

576.  ngx_http_xdrive_rc_create_loc_conf(ngx_conf_t *cf)

577.  {

578.  ngx_http_xdrive_rc_loc_conf_t *conf;

580.  conf = (ngx_http_xdrive_rc_loc_conf_t *)ngx_pcalloc(cf->pool,

581.  sizeof(ngx_http_xdrive_rc_loc_conf_t));

582.  if (conf == NULL)

583.  {

584.  return NULL;

585.  }

587.  conf->upstream.connect_timeout = NGX_CONF_UNSET_MSEC;

588.  conf->upstream.send_timeout = NGX_CONF_UNSET_MSEC;

589.  conf->upstream.read_timeout = NGX_CONF_UNSET_MSEC;

591.  conf->upstream.buffer_size = NGX_CONF_UNSET_SIZE;

593.  /* the hardcoded values */

594.  conf->upstream.cyclic_temp_file = 0;

595.  conf->upstream.buffering = 0;

596.  conf->upstream.ignore_client_abort = 0;

597.  conf->upstream.send_lowat = 0;

598.  conf->upstream.bufs.num = 0;

599.  conf->upstream.busy_buffers_size = 0;

600.  conf->upstream.max_temp_file_size = 0;

601.  conf->upstream.temp_file_write_size = 0;

602.  conf->upstream.intercept_errors = 1;

603.  conf->upstream.intercept_404 = 1;

604.  conf->upstream.pass_request_headers = 0;

605.  conf->upstream.pass_request_body = 0;

607.  conf->uid_index = NGX_CONF_UNSET;

608.  conf->path_index = NGX_CONF_UNSET;

609.  conf->recusive_index = NGX_CONF_UNSET;

611.  return conf;

612.  }

615.  static char *

616.  ngx_http_xdrive_rc_merge_loc_conf(ngx_conf_t *cf, void *parent, void *child)

617.  {

618.  ngx_http_xdrive_rc_loc_conf_t *prev = (ngx_http_xdrive_rc_loc_conf_t *)parent;

619.  ngx_http_xdrive_rc_loc_conf_t *conf = (ngx_http_xdrive_rc_loc_conf_t *)child;

621.  ngx_conf_merge_msec_value(conf->upstream.connect_timeout,

622.  prev->upstream.connect_timeout, 60000);

624.  ngx_conf_merge_msec_value(conf->upstream.send_timeout,

625.  prev->upstream.send_timeout, 60000);

627.  ngx_conf_merge_msec_value(conf->upstream.read_timeout,

628.  prev->upstream.read_timeout, 60000);

630.  ngx_conf_merge_size_value(conf->upstream.buffer_size,

631.  prev->upstream.buffer_size,

632.  (size_t)ngx_pagesize);

634.  ngx_conf_merge_bitmask_value(conf->upstream.next_upstream,

635.  prev->upstream.next_upstream,

636.  (NGX_CONF_BITMASK_SET

637.  | NGX_HTTP_UPSTREAM_FT_ERROR

638.  | NGX_HTTP_UPSTREAM_FT_TIMEOUT));

640.  if (conf->upstream.next_upstream & NGX_HTTP_UPSTREAM_FT_OFF)

641.  {

642.  conf->upstream.next_upstream = NGX_CONF_BITMASK_SET

643.  | NGX_HTTP_UPSTREAM_FT_OFF;

644.  }

646.  if (conf->upstream.upstream == NULL)

647.  {

648.  conf->upstream.upstream = prev->upstream.upstream;

649.  }

651.  if (conf->uid_index == NGX_CONF_UNSET) {

652.  conf->uid_index = prev->uid_index;

653.  }

654.  if (conf->path_index == NGX_CONF_UNSET) {

655.  conf->path_index = prev->path_index;

656.  }

657.  if (conf->recusive_index == NGX_CONF_UNSET) {

658.  conf->recusive_index = prev->recusive_index;

659.  } 

661.  return NGX_CONF_OK;

662.  }

665.  static char *

666.  ngx_http_xdrive_rc_pass(ngx_conf_t *cf, ngx_command_t *cmd, void *conf)

667.  {

668.  ngx_http_xdrive_rc_loc_conf_t *mlcf = (ngx_http_xdrive_rc_loc_conf_t *)conf;

670.  ngx_str_t *value;

671.  ngx_url_t u;

672.  ngx_http_core_loc_conf_t *clcf;

674.  if (mlcf->upstream.upstream)

675.  {

676.  return "is duplicate";

677.  }

679.  value = (ngx_str_t *)cf->args->elts;

681.  ngx_memzero(&u, sizeof(ngx_url_t));

683.  u.url = value[1];

684.  u.no_resolve = 1;

686.  mlcf->upstream.upstream = ngx_http_upstream_add(cf, &u, 0);

687.  if (mlcf->upstream.upstream == NULL)

688.  {

689.  return (char *)(NGX_CONF_ERROR);

690.  }

692.  clcf = (ngx_http_core_loc_conf_t *)ngx_http_conf_get_module_loc_conf(cf, ngx_http_core_module);

694.  clcf->handler = ngx_http_xdrive_rc_handler;

696.  if (clcf->name.data[clcf->name.len - 1] == '/')

697.  {

698.  clcf->auto_redirect = 1;

699.  }

701.  //保存变量index用

702.  mlcf->uid_index = ngx_http_get_variable_index(cf, &ngx_http_proxy_vars[0].name);

703.  if (mlcf->uid_index == NGX_ERROR)

704.  {

705.  return (char *)(NGX_CONF_ERROR);

706.  }

707.  mlcf->path_index = ngx_http_get_variable_index(cf, &ngx_http_proxy_vars[1].name);

708.  if (mlcf->path_index == NGX_ERROR)

709.  {

710.  return (char *)(NGX_CONF_ERROR);

711.  }

712.  mlcf->recusive_index = ngx_http_get_variable_index(cf, &ngx_http_proxy_vars[2].name);

713.  if (mlcf->recusive_index == NGX_ERROR)

714.  {

715.  return (char *)(NGX_CONF_ERROR);

716.  }

718.  return NGX_CONF_OK;

719.  }

723.  static ngx_int_t

724.  ngx_http_xdrive_rc_add_variables(ngx_conf_t *cf)

725.  {

726.  ngx_http_variable_t *var, *v;

728.  for (v = ngx_http_proxy_vars; v->name.len; v++)

729.  {

730.  var = ngx_http_add_variable(cf, &v->name, v->flags);

731.  if (var == NULL)

732.  {

733.  return NGX_ERROR;

734.  }

736.  var->get_handler = v->get_handler;

737.  var->data = v->data;

738.  }

740.  return NGX_OK;

741.  }

代码中一些有意思的地方:

2.  //和buf差不多的思想的 buf chain
3.  typedef  struct
4.  {
5.  ngx_chain_t* _chain_head;
6.  ngx_chain_t* _chain_pos;
7.  ngx_chain_t* _chain_last;
8.  ngx_chain_t* _chain_tail;
9.  } 
10.  ngx_chain_pair_t;

12.  //从buf chain中读取len长内存出来
13.  size_t ngx_cdecl
14.  ngx_chain_read(ngx_chain_pair_t* chain_pair

15.  , uint8_t *buf, uint32_t len);

 //将buf写入到buf chain中

1.  ngx_int_t ngx_cdecl

2.  ngx_chain_write(ngx_pool_t* pool

3.  , ngx_chain_t** free_bufs

4.  , ngx_chain_pair_t* chain_pair

5.  , size_t write_chunk_size

6.  , const uint8_t *buf, uint32_t len);

 //写json或者xml之类回复有用

1.  ngx_int_t ngx_cdecl

2.  ngx_chain_sprintf(ngx_pool_t *pool

3.  , ngx_chain_t **free_bufs

4.  , ngx_chain_pair_t *chain_pair

5.  , size_t write_chunk_size

6.  , const char *fmt, ...);

下面是nginx配置文件中的关键部分

1.  location ~* /rc_list/([0-9]+).html$ { 
2.  xdrive_rc_buffer_size 4096;

3.  set $uid $1;

4.  set $path /;

5.  set $recusive 0;

6.  if ($query_string ~* (|&)recusive=(0|1)(|&)) {

7.  set $recusive $2;

8.  }

9.  xdrive_rc_pass 127.0.0.1:11001;

10.  }
  1. 解释下上面配置文件意思,将url中匹配的用户数值放入uid参数,根据后缀参数判断是否递归将值放入
  2. recusive 参数中;扩展模块将从这三个参数中将需要的值提取出来;
  3. 思路来源于:ngx_http_memcached_module.c 模块,应该还有其他的各种各样的实现方式,不知道还有没有更简单明了的途径;

相关文章

网友评论

      本文标题:nginx 自定义协议 扩展模块开发

      本文链接:https://www.haomeiwen.com/subject/cydnvqtx.html