var Q = require('q'); var oujRedis = require('./oujRedis.js'); var TableHelper = require('./TableHelper.js'); var r2m_configs = require('../../conf/r2m_config.inc.js'); var util = require('util'); var php = require('phpjs'); /** * @type {co.co|exports|module.exports} */ var co = require('co'); /** * @type {Redis2MySql} */ var that; /** * 基本类,提供增删改查 * @param {string} dbKey * @author benzhan */ function Redis2MySql(tableName, dbKey, cacheKey) { this.tableName = tableName; this.dbKey = dbKey || 'default'; this.cacheKey = cacheKey || 'default'; this.cacheInfo = r2m_configs[this.dbKey][tableName]; if (!this.cacheInfo) { throw new Error("redis没配置table name:" + tableName, CODE_REDIS_ERROR); } /** * @type {TableHelper} */ this.objTable = new TableHelper(tableName, this.dbKey); /** * @type {Redis} */ this.objRedis = oujRedis.init(this.cacheKey); that = this; return this; } /** * 获取行的key * @param args * @returns {string} * @private */ Redis2MySql.prototype._getRowKey = function(args) { var key = this.cacheInfo['key']; var keys = key.split(','); var cacheKeys = []; for (var i in keys) { var key = keys[i].trim(); if (args[key]) { if (util.isArray(args[key])) { cacheKeys.push(key + "=" + join('|', args[key])); } else { cacheKeys.push(key + "=" + args[key]); } } } var cacheKey = this.tableName + ":row"; if (cacheKey) { return cacheKey + ':' + cacheKeys.join(':'); } else { return cacheKey; } } /** * 获取getAll的key * @param args * @returns {string} * @private */ Redis2MySql.prototype._getAllKey = function(args) { var cacheKey = this.tableName + ":all"; var otherCacheKey = cacheKey + ":others"; var key = this.cacheInfo['all_key']; if (key) { var keys = php.explode(',', key); var cacheKeys = []; for (var i in keys) { var key = keys[i].trim(); if (util.isArray(args[key]) || typeof args[key] == 'undefined' || args[key] === null) { // 如果有数组,或者有key不存在,则归类到others return otherCacheKey; } else { cacheKeys.push(key + "=" + args[key]); } } if (cacheKeys) { return cacheKey + ':' + cacheKeys.join(':'); } else { return otherCacheKey; } } else { return otherCacheKey; } } /** * 获取一个key的数据 * @param where * @return * @public */ Redis2MySql.prototype.getRow = function getRow(where) { var cacheKey = this._getRowKey(where); var deferred = co(function*() { var data = yield that.objRedis.hgetall(cacheKey); if (php.empty(data)) { // 从数据库重建 var row = yield that.objTable.getRow(where); if (!php.empty(row)) { // 设置缓存,无需等待成功就能返回 that._setRowCache(row); } return row; } else { return data; } }); return deferred; } /** * 读取多行数据 * @param where * @param keyWord 查询关键字, array('_field', '_where', '_limit', '_sortKey', '_sortDir', '_lockRow', '_tableName') * @param bool updateList 是否强制更新缓存 * @return array: * @public */ Redis2MySql.prototype.getAll = function(where, keyWord, updateList) { var args = php.array_merge(where, keyWord); var key = php.http_build_query(args); if (key.length > 32) { key = php.md5(key); } else { key = php.http_build_query(args); } return co(function*() { var cacheKey = that._getAllKey(where) + ":" + key; if (!updateList) { var data = yield that.objRedis.get(cacheKey) if (data) { return JSON.parse(data); } } var data = yield that.objTable.getAll(where, keyWord); // 以下内容只是用来同步redis用的,可以异步跳过 var pipeline = that.objRedis.pipeline(); pipeline.set(cacheKey, JSON.stringify(data)); if (that.cacheInfo['ttl'] > 0) { pipeline.expire(cacheKey, that.cacheInfo['ttl']); } pipeline.exec(); return data; }); } /** * 增加一行数据 * @param args * @param boolean updateList * @return int * @public */ Redis2MySql.prototype.addObject = function addObject(args, updateList) { updateList = updateList === false ? updateList : true; var deferred = co(function*() { var ret = yield that.objTable.addObject(args); if (updateList) { that.delListCache(args); } return ret; }); return deferred; } /** * 删除列表的缓存 * @param where * @public */ Redis2MySql.prototype.delListCache = function(where) { var cacheKey = that._getAllKey(where); var otherKey = this.tableName + ":all:others"; var deferred = co(function*() { var keys = []; if (cacheKey === otherKey) { // 需要清除所有key删除 cacheKey = that.tableName + ":all"; keys = yield that.objRedis.keys(cacheKey + '*'); } else { // 除了删除当前keys,还需要删除others keys = yield that.objRedis.keys(cacheKey + '*'); var keys1 = yield that.objRedis.keys(otherKey + '*'); keys = php.array_merge(keys, keys1); } if (keys.length > 0) { that.objRedis.del(keys); } return 1; }); return deferred; } /** * 设置行的缓存 * @param args * @returns {Array|{index: number, input: string}|*|{arity, flags, keyStart, keyStop, step}} * @private */ Redis2MySql.prototype._setRowCache = function(args) { var cacheKey = this._getRowKey(args); if (!cacheKey) { var msg = "没设置key:cacheKey," . JSON.stringify(this.cacheInfo); throw new Error(msg, CODE_REDIS_ERROR); } var pipeline = this.objRedis.pipeline(); pipeline.hmset(cacheKey, args); if (this.cacheInfo['ttl'] > 0) { pipeline.expire(cacheKey, this.cacheInfo['ttl']); } return pipeline.exec(); } /** * 更新行缓存 * @param args * @returns {*} * @private */ Redis2MySql.prototype._updateRowCache = function(args) { var cacheKey = this._getRowKey(args); if (!cacheKey) { var msg = "没设置key:cacheKey," . JSON.stringify(this.cacheInfo); throw new Error(msg, CODE_REDIS_ERROR); } var deferred = co(function *() { var flag = yield that.objRedis.exists(cacheKey); if (flag == 1) { var pipeline = that.objRedis.pipeline(); pipeline.hmset(cacheKey, args); if (that.cacheInfo['ttl'] > 0) { pipeline.expire(cacheKey, that.cacheInfo['ttl']); } yield pipeline.exec(); return 1; } }); return deferred; } /** * 修改一个key的数据 * @param args 更新的内容 * @param where 更新的条件 * @param boolean updateList * @return int 影响行数 * @public */ Redis2MySql.prototype.updateObject = function(args, where, updateList) { updateList = updateList === false ? updateList : true; var deferred = co(function*() { var result = yield that.objTable.updateObject(args, where); args = php.array_merge(args, where); if (result.affectedRows > 0) { yield that._updateRowCache(args); if (updateList) { that.delListCache(where); } } return result; }); return deferred; } /** * 设置一个key的数据 * @param args * @param boolean updateList * @return int 影响行数 * @public */ Redis2MySql.prototype.replaceObject = function(args, updateList) { var deferred = co(function*() { var result = yield that.objTable.replaceObject(args); yield that._setRowCache(args); if (updateList) { that.delListCache(args); } return result; }); return deferred; } /** * 删除数据 * @param array $where * @throws RedisException * @return unknown * @public */ Redis2MySql.prototype.delObject = function(where, updateList) { updateList = updateList === false ? updateList : true; var deferred = co(function*() { var result = yield that.objTable.delObject(where); that._delRowCache(where); if (updateList) { that.delListCache(where); } return result; }); return deferred; } /** * 删除行的缓存 * @param where * @throws RedisException * @private */ Redis2MySql.prototype._delRowCache = function(where) { var cacheKey = this._getRowKey(where); if (!cacheKey) { var msg = "没设置key:cacheKey," . JSON.stringify(this.cacheInfo); throw new Error(msg, CODE_REDIS_ERROR); } return this.objRedis.del(cacheKey); } /** * 关闭连接 * @public */ Redis2MySql.prototype.close = function() { if (this.objTable) { this.objTable.close(); } if (this.objRedis) { this.objRedis.disconnect(); } } module.exports = Redis2MySql;