ws_group.js 5.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195
  1. 'use strict'
  2. let index = process.argv[2] || 0;
  3. let fileName = __filename;
  4. let basePort = 8390;
  5. if (fileName.indexOf('/new.') !== -1 || fileName.indexOf('/new-') !== -1 || fileName.indexOf('_new/') !== -1) { // 预发布
  6. basePort = 8590;
  7. }
  8. global['WS_PORTS'] = basePort + parseInt(index);
  9. // 加载扩展函数
  10. require('./extensions/function_extend.js');
  11. require('./conf/config.inc.js');
  12. require('./conf/code.inc.js');
  13. const WebSocket = require('ws');
  14. const OujRedis = require('./framework/lib/OujRedis.js');
  15. const Log = require('./framework/lib/CallLog.js');
  16. const Qs = require("querystring");
  17. const Url = require("url");
  18. const CallLog = new Log();
  19. const wss = new WebSocket.Server({
  20. port: global['WS_PORTS'],
  21. });
  22. const serverIp = getServerIp();
  23. let groupClients = {}; // groupId => [client ... ]
  24. let clientMap = {};
  25. let wsInterval = 0;
  26. let times = 0;
  27. initRedis();
  28. // Broadcast to group.
  29. wss.broadcastToGroup = function (groupId, data) {
  30. let i = 1;
  31. let startTime = (new Date).getTime();
  32. if (!groupClients[groupId]) {
  33. groupClients[groupId] = [];
  34. }
  35. let msg = `ServerIp: ${serverIp}, No: ${index}th ws, group: ${groupId}, broadcast: ${groupClients[groupId].length} clients.\n`;
  36. groupClients[groupId].forEach(function each(client) {
  37. let client_id = client.client_id;
  38. if (client.readyState === WebSocket.OPEN) {
  39. client.send(data);
  40. let endTime = (new Date).getTime();
  41. let span = endTime - startTime;
  42. startTime = endTime;
  43. msg += `${i}th child:${client_id}, use ${span} ms.\n`;
  44. } else {
  45. msg += `${i}th error child:${client_id}, state: ${client.readyState}.\n`;
  46. }
  47. i++;
  48. });
  49. CallLog.setFromUrl('/ws_group/broadcast');
  50. CallLog.logSelfCall(0, msg);
  51. _log(msg);
  52. };
  53. wss.on('close', function close(ws) {
  54. let clientId = ws.client_id;
  55. let groupId = clientMap[clientId] || 0;
  56. delete clientMap[clientId];
  57. delete groupClients[groupId];
  58. });
  59. wss.on('connection', function connection(ws, params) {
  60. initWs(ws);
  61. let query = {};
  62. try {
  63. query = Qs.parse(Url.parse(params.url).query);
  64. } catch (e) {}
  65. let group_id = query['group_id'];
  66. CallLog.setFromUrl('/ws_group/connection');
  67. ws.client_id = ws._socket._handle.fd;
  68. let msg = '';
  69. if (group_id) {
  70. msg = 'connect new group_id:' + group_id + ', url:' + params.url;
  71. if (!groupClients[group_id]) {
  72. groupClients[group_id] = [];
  73. }
  74. groupClients[group_id].push(ws);
  75. clientMap[ws.client_id] = group_id;
  76. CallLog.logSelfCall(CODE_SUCCESS, msg);
  77. } else {
  78. msg = 'connect unknown group_id:' + group_id + ', url:' + params.url;
  79. CallLog.logSelfCall(CODE_UNKNOW_ERROT, msg);
  80. ws.terminate();
  81. }
  82. _log(msg);
  83. });
  84. function initWs(ws) {
  85. ws.isAlive = true;
  86. ws.on('pong', () => {
  87. ws.isAlive = true;
  88. });
  89. // 每30秒检测一次客户端是否正常
  90. if (wsInterval === 0) {
  91. wsInterval = setInterval(() => {
  92. wss.clients.forEach(function each(ws) {
  93. if (ws.isAlive === false) {
  94. _log('ws.terminate():');
  95. return ws.terminate();
  96. }
  97. ws.isAlive = false;
  98. ws.ping();
  99. });
  100. }, 30000);
  101. }
  102. }
  103. function initRedis() {
  104. _log('initRedis, times:' + times++);
  105. let objRedis = OujRedis.init('dw_chat');
  106. let pattern = 'chat:group:*';
  107. objRedis.punsubscribe(pattern);
  108. objRedis.psubscribe(pattern, function (err, count) {
  109. // Now we are subscribed to both the 'news' and 'music' channels.
  110. // `count` represents the number of channels we are currently subscribed to.
  111. if (!err) {
  112. _log('psubscribe success, count:' + count);
  113. }
  114. });
  115. let last_msg = null;
  116. objRedis.on('pmessage', function (pattern, channel, message) {
  117. let data = JSON.parse(message);
  118. let msg = channel + ' => ' + message;
  119. // 防止重复消息推送
  120. if (last_msg !== msg) {
  121. CallLog.setFromUrl('/ws_group/redisMessage');
  122. // chat:group [timestamp, type, group_id, data]
  123. let groupId = data.group_id;
  124. if (groupId > 0) {
  125. let sendData = JSON.stringify({channel, data});
  126. wss.broadcastToGroup(groupId, sendData);
  127. msg = `No: ${index}th ws, objRedis broadcast: ` + msg;
  128. CallLog.logSelfCall(CODE_SUCCESS, msg);
  129. } else {
  130. CallLog.logSelfCall(CODE_UNKNOW_ERROT, `No: ${index}th ws, objRedis miss groupId: ` + msg);
  131. }
  132. last_msg = msg;
  133. } else {
  134. _log('【error】objRedis, repeat message:' + last_msg);
  135. }
  136. _log(msg);
  137. });
  138. // 点一次要初始化定时器
  139. checkRedis(objRedis);
  140. return objRedis;
  141. }
  142. function checkRedis(objRedis) {
  143. // 每秒检测redis是否正常
  144. objRedis.ping(function (err, result) {
  145. if (err) {
  146. console.error(err);
  147. initRedis();
  148. } else {
  149. //_log(result);
  150. setTimeout(function() {
  151. objRedis = checkRedis(objRedis);
  152. }, 1000);
  153. }
  154. });
  155. return objRedis;
  156. }
  157. function _log(msg) {
  158. let time = new Date().toLocaleString();
  159. console.log(`【${time}】${msg}`);
  160. }