123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210 |
- 'use strict';
- let index = process.argv[2] || 0;
- let fileName = __filename;
- let basePort = 8490;
- if (fileName.indexOf('/new.') !== -1 || fileName.indexOf('/new-') !== -1 || fileName.indexOf('_new/') !== -1) { // 预发布
- basePort = 8690;
- }
- global['WS_PORTS'] = basePort + parseInt(index);
- // 加载扩展函数
- require('./extensions/function_extend.js');
- require('./conf/config.inc.js');
- const WebSocket = require('ws');
- const OujRedis = require('./framework/lib/OujRedis.js');
- const Log = require('./framework/lib/CallLog.js');
- const TableHelper = require('./framework/lib/TableHelper');
- global['CONTROLLER_NAME'] = 'ws';
- global['ACTION_NAME'] = '';
- const CallLog = new Log();
- const wss = new WebSocket.Server({
- port: global['WS_PORTS'],
- });
- let objRedis = OujRedis.init('dw_chat');
- let objUser = new TableHelper('user_info', 'dw_chat');
- let objUserGroup = new TableHelper('user_group', 'dw_chat');
- wss.on('close', function close(ws) {
- console.log('disconnected:' + ws.client_id);
- });
- wss.on('connection', async function connection(ws, params) {
- // 检查孩子们是否还活着
- ws.isAlive = true;
- ws.on('pong', () => {
- ws.isAlive = true;
- // _log(`userId:${ws.user_id} accept pong!`);
- });
- ws.client_id = ws._socket._handle.fd;
- let query = {};
- try {
- query = require("querystring").parse(require('url').parse(params.url).query);
- } catch (e) {}
- let user_id = query['user_id'];
- let token = query['token'];
- let loginFail = false;
- if (!user_id || !token) {
- loginFail = true;
- } else {
- let key = `globals:token:${token}`;
- let value = await objRedis.get(key);
- // console.log('key:' + key + ', value:' + value);
- loginFail = value !== user_id;
- }
- let code = CODE_SUCCESS;
- let msg = `new connection, client_id:${ws.client_id}, user_id:${user_id}, url:${params.url}\n`;
- // 检查登录态
- if (loginFail) {
- code = CODE_USER_LOGIN_FAIL;
- msg += `check login fail, user_id: ${user_id}\n`;
- ws.terminate();
- } else {
- ws.user_id = user_id;
- let user = await getUserById(user_id);
- if (user) {
- ws.objRedis = initUserRedis(ws, user.user_id);
- } else {
- code = CODE_USER_LOGIN_FAIL;
- msg += `user_id:${user_id} not found in DB\n`;
- }
- }
- CallLog.setFromUrl('/chat/connection');
- CallLog.logSelfCall(code, msg);
- _log(msg)
- });
- // 每30秒检测一次客户端是否正常
- setInterval(() => {
- wss.clients.forEach(function each(ws) {
- if (ws.isAlive === false) {
- _log('ws.terminate():');
- // websocket有问题,则需要关闭redis
- ws.objRedis.disconnect(false); // true indicates we need reconnect
- ws.objRedis = null;
- ws.terminate();
- _log(`【error】account: ${ws.user_id}, ws.terminate. `);
- } else {
- ws.objRedis.ping(function (err, result) {
- if (err) {
- // 尝试重新链接
- ws.objRedis.disconnect(false); // true indicates we need reconnect
- ws.objRedis = initUserRedis(ws, ws.user_id);
- _log(`【error】account: ${ws.account}, objRedis.disconnect, and initUserRedis. `);
- }
- });
- ws.isAlive = false;
- ws.ping();
- }
- });
- }, 30000);
- function initUserRedis(ws, userId) {
- // 群聊天消息:chat:group:msg_xxx
- let objRedis = OujRedis.initNewOne('dw_chat');
- let list = [`chat:person:${userId}`, 'chat:group'];
- getUserGroupRel(userId).then(groupIdsList => {
- for (let k in groupIdsList) {
- list.push(`chat:group:${groupIdsList[k].group_id}`)
- }
- objRedis.unsubscribe(list);
- objRedis.subscribe(list, function (err, count) {
- // Now we are subscribed to both the 'news' and 'music' channels.
- // `count` represents the number of channels we are currently subscribed to.
- if (!err) {
- _log(`userId:${userId} subscribe success, count:${count}` + list);
- }
- });
- });
- let last_msg = null;
- objRedis.on('message', function (channel, message) {
- let data = JSON.parse(message);
- let msg = channel + ' => ' + message;
- // 加入群组
- if ('join_group' === data.type) {
- let groupId = data.group_id;
- if (groupId > 0) {
- let cl = `chat:group:${groupId}`;
- objRedis.subscribe(cl, function (err, count) {
- if (!err) {
- _log(`userId:${userId} join group, groupId:${groupId} count:${count}`);
- }
- });
- }
- } else if ('leave_group' === data.type) {
- if (data.group_id > 0) {
- let cl = `chat:group:${data.group_id}`;
- objRedis.unsubscribe(cl, function (err, count) {
- if (err) {
- _log(`userId:${userId} leave group err:${err}`)
- } else {
- _log(`userId:${userId} leave group, groupId:`)
- }
- })
- }
- }
- // 防止重复消息推送
- if (last_msg !== msg) {
- CallLog.setFromUrl('/ws/broadcast');
- if (ws.readyState === WebSocket.OPEN) {
- let sendData = JSON.stringify({channel, data});
- ws.send(sendData);
- CallLog.logSelfCall(CODE_SUCCESS, msg);
- } else {
- msg += `account:${ws.account}, client_id:${ws.client_id}, state: ${ws.readyState}.\n`;
- CallLog.logSelfCall(CODE_UNKNOW_ERROT, msg);
- }
- _log(msg);
- last_msg = msg;
- } else {
- _log('【error】objRedis, repeat message:' + last_msg);
- }
- });
- return objRedis;
- }
- /**
- * 通过用户Id获取用户信息
- * @param user_name
- * @returns {Promise<User>}
- */
- async function getUserById(user_id) {
- return objUser.getRow({user_id});
- }
- /**
- * 用户群关系信息
- * @param user_id
- * @returns {Promise<List>}
- */
- async function getUserGroupRel(user_id) {
- return objUserGroup.getAll({user_id});
- }
- function _log(msg) {
- let time = new Date().toLocaleString();
- console.log(`【${time}】${msg}`);
- }
|