'use strict'; let index = process.argv[2] || 0; let fileName = __filename; let basePort = 8490; if (fileName.indexOf('/new.') !== -1 || fileName.indexOf('/new-') !== -1 || fileName.indexOf('_new/') !== -1) { // 预发布 basePort = 8690; } global['WS_PORTS'] = basePort + parseInt(index); // 加载扩展函数 require('./extensions/function_extend.js'); require('./conf/config.inc.js'); const WebSocket = require('ws'); const OujRedis = require('./framework/lib/OujRedis.js'); const Log = require('./framework/lib/CallLog.js'); const TableHelper = require('./framework/lib/TableHelper'); global['CONTROLLER_NAME'] = 'ws'; global['ACTION_NAME'] = ''; const CallLog = new Log(); const wss = new WebSocket.Server({ port: global['WS_PORTS'], }); let objRedis = OujRedis.init('dw_chat'); let objUser = new TableHelper('user_info', 'dw_chat'); let objUserGroup = new TableHelper('user_group', 'dw_chat'); wss.on('close', function close(ws) { console.log('disconnected:' + ws.client_id); }); wss.on('connection', async function connection(ws, params) { // 检查孩子们是否还活着 ws.isAlive = true; ws.on('pong', () => { ws.isAlive = true; // _log(`userId:${ws.user_id} accept pong!`); }); ws.client_id = ws._socket._handle.fd; let query = {}; try { query = require("querystring").parse(require('url').parse(params.url).query); } catch (e) {} let user_id = query['user_id']; let token = query['token']; let loginFail = false; if (!user_id || !token) { loginFail = true; } else { let key = `globals:token:${token}`; let value = await objRedis.get(key); // console.log('key:' + key + ', value:' + value); loginFail = value !== user_id; } let code = CODE_SUCCESS; let msg = `new connection, client_id:${ws.client_id}, user_id:${user_id}, url:${params.url}\n`; // 检查登录态 if (loginFail) { code = CODE_USER_LOGIN_FAIL; msg += `check login fail, user_id: ${user_id}\n`; ws.terminate(); } else { ws.user_id = user_id; let user = await getUserById(user_id); if (user) { ws.objRedis = initUserRedis(ws, user.user_id); } else { code = CODE_USER_LOGIN_FAIL; msg += `user_id:${user_id} not found in DB\n`; } } CallLog.setFromUrl('/chat/connection'); CallLog.logSelfCall(code, msg); _log(msg) }); // 每30秒检测一次客户端是否正常 setInterval(() => { wss.clients.forEach(function each(ws) { if (ws.isAlive === false) { _log('ws.terminate():'); // websocket有问题,则需要关闭redis ws.objRedis.disconnect(false); // true indicates we need reconnect ws.objRedis = null; ws.terminate(); _log(`【error】account: ${ws.user_id}, ws.terminate. `); } else { ws.objRedis.ping(function (err, result) { if (err) { // 尝试重新链接 ws.objRedis.disconnect(false); // true indicates we need reconnect ws.objRedis = initUserRedis(ws, ws.user_id); _log(`【error】account: ${ws.account}, objRedis.disconnect, and initUserRedis. `); } }); ws.isAlive = false; ws.ping(); } }); }, 30000); function initUserRedis(ws, userId) { // 群聊天消息:chat:group:msg_xxx let objRedis = OujRedis.initNewOne('dw_chat'); let list = [`chat:person:${userId}`, 'chat:group']; getUserGroupRel(userId).then(groupIdsList => { for (let k in groupIdsList) { list.push(`chat:group:${groupIdsList[k].group_id}`) } objRedis.unsubscribe(list); objRedis.subscribe(list, function (err, count) { // Now we are subscribed to both the 'news' and 'music' channels. // `count` represents the number of channels we are currently subscribed to. if (!err) { _log(`userId:${userId} subscribe success, count:${count}` + list); } }); }); let last_msg = null; objRedis.on('message', function (channel, message) { let data = JSON.parse(message); let msg = channel + ' => ' + message; // 加入群组 if ('join_group' === data.type) { let groupId = data.group_id; if (groupId > 0) { let cl = `chat:group:${groupId}`; objRedis.subscribe(cl, function (err, count) { if (!err) { _log(`userId:${userId} join group, groupId:${groupId} count:${count}`); } }); } } else if ('leave_group' === data.type) { if (data.group_id > 0) { let cl = `chat:group:${data.group_id}`; objRedis.unsubscribe(cl, function (err, count) { if (err) { _log(`userId:${userId} leave group err:${err}`) } else { _log(`userId:${userId} leave group, groupId:`) } }) } } // 防止重复消息推送 if (last_msg !== msg) { CallLog.setFromUrl('/ws/broadcast'); if (ws.readyState === WebSocket.OPEN) { let sendData = JSON.stringify({channel, data}); ws.send(sendData); CallLog.logSelfCall(CODE_SUCCESS, msg); } else { msg += `account:${ws.account}, client_id:${ws.client_id}, state: ${ws.readyState}.\n`; CallLog.logSelfCall(CODE_UNKNOW_ERROT, msg); } _log(msg); last_msg = msg; } else { _log('【error】objRedis, repeat message:' + last_msg); } }); return objRedis; } /** * 通过用户Id获取用户信息 * @param user_name * @returns {Promise} */ async function getUserById(user_id) { return objUser.getRow({user_id}); } /** * 用户群关系信息 * @param user_id * @returns {Promise} */ async function getUserGroupRel(user_id) { return objUserGroup.getAll({user_id}); } function _log(msg) { let time = new Date().toLocaleString(); console.log(`【${time}】${msg}`); }