123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195 |
- 'use strict'
- let index = process.argv[2] || 0;
- let fileName = __filename;
- let basePort = 8390;
- if (fileName.indexOf('/new.') !== -1 || fileName.indexOf('/new-') !== -1 || fileName.indexOf('_new/') !== -1) { // 预发布
- basePort = 8590;
- }
- global['WS_PORTS'] = basePort + parseInt(index);
- // 加载扩展函数
- require('./extensions/function_extend.js');
- require('./conf/config.inc.js');
- require('./conf/code.inc.js');
- const WebSocket = require('ws');
- const OujRedis = require('./framework/lib/OujRedis.js');
- const Log = require('./framework/lib/CallLog.js');
- const Qs = require("querystring");
- const Url = require("url");
- const CallLog = new Log();
- const wss = new WebSocket.Server({
- port: global['WS_PORTS'],
- });
- const serverIp = getServerIp();
- let groupClients = {}; // groupId => [client ... ]
- let clientMap = {};
- let wsInterval = 0;
- let times = 0;
- initRedis();
- // Broadcast to group.
- wss.broadcastToGroup = function (groupId, data) {
- let i = 1;
- let startTime = (new Date).getTime();
- if (!groupClients[groupId]) {
- groupClients[groupId] = [];
- }
- let msg = `ServerIp: ${serverIp}, No: ${index}th ws, group: ${groupId}, broadcast: ${groupClients[groupId].length} clients.\n`;
- groupClients[groupId].forEach(function each(client) {
- let client_id = client.client_id;
- if (client.readyState === WebSocket.OPEN) {
- client.send(data);
- let endTime = (new Date).getTime();
- let span = endTime - startTime;
- startTime = endTime;
- msg += `${i}th child:${client_id}, use ${span} ms.\n`;
- } else {
- msg += `${i}th error child:${client_id}, state: ${client.readyState}.\n`;
- }
- i++;
- });
- CallLog.setFromUrl('/ws_group/broadcast');
- CallLog.logSelfCall(0, msg);
- _log(msg);
- };
- wss.on('close', function close(ws) {
- let clientId = ws.client_id;
- let groupId = clientMap[clientId] || 0;
- delete clientMap[clientId];
- delete groupClients[groupId];
- });
- wss.on('connection', function connection(ws, params) {
- initWs(ws);
- let query = {};
- try {
- query = Qs.parse(Url.parse(params.url).query);
- } catch (e) {}
- let group_id = query['group_id'];
- CallLog.setFromUrl('/ws_group/connection');
- ws.client_id = ws._socket._handle.fd;
- let msg = '';
- if (group_id) {
- msg = 'connect new group_id:' + group_id + ', url:' + params.url;
- if (!groupClients[group_id]) {
- groupClients[group_id] = [];
- }
- groupClients[group_id].push(ws);
- clientMap[ws.client_id] = group_id;
- CallLog.logSelfCall(CODE_SUCCESS, msg);
- } else {
- msg = 'connect unknown group_id:' + group_id + ', url:' + params.url;
- CallLog.logSelfCall(CODE_UNKNOW_ERROT, msg);
- ws.terminate();
- }
- _log(msg);
- });
- function initWs(ws) {
- ws.isAlive = true;
- ws.on('pong', () => {
- ws.isAlive = true;
- });
- // 每30秒检测一次客户端是否正常
- if (wsInterval === 0) {
- wsInterval = setInterval(() => {
- wss.clients.forEach(function each(ws) {
- if (ws.isAlive === false) {
- _log('ws.terminate():');
- return ws.terminate();
- }
- ws.isAlive = false;
- ws.ping();
- });
- }, 30000);
- }
- }
- function initRedis() {
- _log('initRedis, times:' + times++);
- let objRedis = OujRedis.init('dw_chat');
- let pattern = 'chat:group:*';
- objRedis.punsubscribe(pattern);
- objRedis.psubscribe(pattern, function (err, count) {
- // Now we are subscribed to both the 'news' and 'music' channels.
- // `count` represents the number of channels we are currently subscribed to.
- if (!err) {
- _log('psubscribe success, count:' + count);
- }
- });
- let last_msg = null;
- objRedis.on('pmessage', function (pattern, channel, message) {
- let data = JSON.parse(message);
- let msg = channel + ' => ' + message;
- // 防止重复消息推送
- if (last_msg !== msg) {
- CallLog.setFromUrl('/ws_group/redisMessage');
- // chat:group [timestamp, type, group_id, data]
- let groupId = data.group_id;
- if (groupId > 0) {
- let sendData = JSON.stringify({channel, data});
- wss.broadcastToGroup(groupId, sendData);
- msg = `No: ${index}th ws, objRedis broadcast: ` + msg;
- CallLog.logSelfCall(CODE_SUCCESS, msg);
- } else {
- CallLog.logSelfCall(CODE_UNKNOW_ERROT, `No: ${index}th ws, objRedis miss groupId: ` + msg);
- }
- last_msg = msg;
- } else {
- _log('【error】objRedis, repeat message:' + last_msg);
- }
- _log(msg);
- });
- // 点一次要初始化定时器
- checkRedis(objRedis);
- return objRedis;
- }
- function checkRedis(objRedis) {
- // 每秒检测redis是否正常
- objRedis.ping(function (err, result) {
- if (err) {
- console.error(err);
- initRedis();
- } else {
- //_log(result);
- setTimeout(function() {
- objRedis = checkRedis(objRedis);
- }, 1000);
- }
- });
- return objRedis;
- }
- function _log(msg) {
- let time = new Date().toLocaleString();
- console.log(`【${time}】${msg}`);
- }
|