ws_im.js 6.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210
  1. 'use strict';
  2. let index = process.argv[2] || 0;
  3. let fileName = __filename;
  4. let basePort = 8490;
  5. if (fileName.indexOf('/new.') !== -1 || fileName.indexOf('/new-') !== -1 || fileName.indexOf('_new/') !== -1) { // 预发布
  6. basePort = 8690;
  7. }
  8. global['WS_PORTS'] = basePort + parseInt(index);
  9. // 加载扩展函数
  10. require('./extensions/function_extend.js');
  11. require('./conf/config.inc.js');
  12. const WebSocket = require('ws');
  13. const OujRedis = require('./framework/lib/OujRedis.js');
  14. const Log = require('./framework/lib/CallLog.js');
  15. const TableHelper = require('./framework/lib/TableHelper');
  16. global['CONTROLLER_NAME'] = 'ws';
  17. global['ACTION_NAME'] = '';
  18. const CallLog = new Log();
  19. const wss = new WebSocket.Server({
  20. port: global['WS_PORTS'],
  21. });
  22. let objRedis = OujRedis.init('dw_chat');
  23. let objUser = new TableHelper('user_info', 'dw_chat');
  24. let objUserGroup = new TableHelper('user_group', 'dw_chat');
  25. wss.on('close', function close(ws) {
  26. console.log('disconnected:' + ws.client_id);
  27. });
  28. wss.on('connection', async function connection(ws, params) {
  29. // 检查孩子们是否还活着
  30. ws.isAlive = true;
  31. ws.on('pong', () => {
  32. ws.isAlive = true;
  33. // _log(`userId:${ws.user_id} accept pong!`);
  34. });
  35. ws.client_id = ws._socket._handle.fd;
  36. let query = {};
  37. try {
  38. query = require("querystring").parse(require('url').parse(params.url).query);
  39. } catch (e) {}
  40. let user_id = query['user_id'];
  41. let token = query['token'];
  42. let loginFail = false;
  43. if (!user_id || !token) {
  44. loginFail = true;
  45. } else {
  46. let key = `globals:token:${token}`;
  47. let value = await objRedis.get(key);
  48. // console.log('key:' + key + ', value:' + value);
  49. loginFail = value !== user_id;
  50. }
  51. let code = CODE_SUCCESS;
  52. let msg = `new connection, client_id:${ws.client_id}, user_id:${user_id}, url:${params.url}\n`;
  53. // 检查登录态
  54. if (loginFail) {
  55. code = CODE_USER_LOGIN_FAIL;
  56. msg += `check login fail, user_id: ${user_id}\n`;
  57. ws.terminate();
  58. } else {
  59. ws.user_id = user_id;
  60. let user = await getUserById(user_id);
  61. if (user) {
  62. ws.objRedis = initUserRedis(ws, user.user_id);
  63. } else {
  64. code = CODE_USER_LOGIN_FAIL;
  65. msg += `user_id:${user_id} not found in DB\n`;
  66. }
  67. }
  68. CallLog.setFromUrl('/chat/connection');
  69. CallLog.logSelfCall(code, msg);
  70. _log(msg)
  71. });
  72. // 每30秒检测一次客户端是否正常
  73. setInterval(() => {
  74. wss.clients.forEach(function each(ws) {
  75. if (ws.isAlive === false) {
  76. _log('ws.terminate():');
  77. // websocket有问题,则需要关闭redis
  78. ws.objRedis.disconnect(false); // true indicates we need reconnect
  79. ws.objRedis = null;
  80. ws.terminate();
  81. _log(`【error】account: ${ws.user_id}, ws.terminate. `);
  82. } else {
  83. ws.objRedis.ping(function (err, result) {
  84. if (err) {
  85. // 尝试重新链接
  86. ws.objRedis.disconnect(false); // true indicates we need reconnect
  87. ws.objRedis = initUserRedis(ws, ws.user_id);
  88. _log(`【error】account: ${ws.account}, objRedis.disconnect, and initUserRedis. `);
  89. }
  90. });
  91. ws.isAlive = false;
  92. ws.ping();
  93. }
  94. });
  95. }, 30000);
  96. function initUserRedis(ws, userId) {
  97. // 群聊天消息:chat:group:msg_xxx
  98. let objRedis = OujRedis.initNewOne('dw_chat');
  99. let list = [`chat:person:${userId}`, 'chat:group'];
  100. getUserGroupRel(userId).then(groupIdsList => {
  101. for (let k in groupIdsList) {
  102. list.push(`chat:group:${groupIdsList[k].group_id}`)
  103. }
  104. objRedis.unsubscribe(list);
  105. objRedis.subscribe(list, function (err, count) {
  106. // Now we are subscribed to both the 'news' and 'music' channels.
  107. // `count` represents the number of channels we are currently subscribed to.
  108. if (!err) {
  109. _log(`userId:${userId} subscribe success, count:${count}` + list);
  110. }
  111. });
  112. });
  113. let last_msg = null;
  114. objRedis.on('message', function (channel, message) {
  115. let data = JSON.parse(message);
  116. let msg = channel + ' => ' + message;
  117. // 加入群组
  118. if ('join_group' === data.type) {
  119. let groupId = data.group_id;
  120. if (groupId > 0) {
  121. let cl = `chat:group:${groupId}`;
  122. objRedis.subscribe(cl, function (err, count) {
  123. if (!err) {
  124. _log(`userId:${userId} join group, groupId:${groupId} count:${count}`);
  125. }
  126. });
  127. }
  128. } else if ('leave_group' === data.type) {
  129. if (data.group_id > 0) {
  130. let cl = `chat:group:${data.group_id}`;
  131. objRedis.unsubscribe(cl, function (err, count) {
  132. if (err) {
  133. _log(`userId:${userId} leave group err:${err}`)
  134. } else {
  135. _log(`userId:${userId} leave group, groupId:`)
  136. }
  137. })
  138. }
  139. }
  140. // 防止重复消息推送
  141. if (last_msg !== msg) {
  142. CallLog.setFromUrl('/ws/broadcast');
  143. if (ws.readyState === WebSocket.OPEN) {
  144. let sendData = JSON.stringify({channel, data});
  145. ws.send(sendData);
  146. CallLog.logSelfCall(CODE_SUCCESS, msg);
  147. } else {
  148. msg += `account:${ws.account}, client_id:${ws.client_id}, state: ${ws.readyState}.\n`;
  149. CallLog.logSelfCall(CODE_UNKNOW_ERROT, msg);
  150. }
  151. _log(msg);
  152. last_msg = msg;
  153. } else {
  154. _log('【error】objRedis, repeat message:' + last_msg);
  155. }
  156. });
  157. return objRedis;
  158. }
  159. /**
  160. * 通过用户Id获取用户信息
  161. * @param user_name
  162. * @returns {Promise<User>}
  163. */
  164. async function getUserById(user_id) {
  165. return objUser.getRow({user_id});
  166. }
  167. /**
  168. * 用户群关系信息
  169. * @param user_id
  170. * @returns {Promise<List>}
  171. */
  172. async function getUserGroupRel(user_id) {
  173. return objUserGroup.getAll({user_id});
  174. }
  175. function _log(msg) {
  176. let time = new Date().toLocaleString();
  177. console.log(`【${time}】${msg}`);
  178. }