123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387 |
- var Q = require('q');
- var oujRedis = require('./oujRedis.js');
- var TableHelper = require('./TableHelper.js');
- var r2m_configs = require('../../conf/r2m_config.inc.js');
- var util = require('util');
- var php = require('phpjs');
- /**
- * @type {co.co|exports|module.exports}
- */
- var co = require('co');
- /**
- * @type {Redis2MySql}
- */
- var that;
- /**
- * 基本类,提供增删改查
- * @param {string} dbKey
- * @author benzhan
- */
- function Redis2MySql(tableName, dbKey, cacheKey) {
- this.tableName = tableName;
- this.dbKey = dbKey || 'default';
- this.cacheKey = cacheKey || 'default';
- this.cacheInfo = r2m_configs[this.dbKey][tableName];
- if (!this.cacheInfo) {
- throw new Error("redis没配置table name:" + tableName, CODE_REDIS_ERROR);
- }
- /**
- * @type {TableHelper}
- */
- this.objTable = new TableHelper(tableName, this.dbKey);
- /**
- * @type {Redis}
- */
- this.objRedis = oujRedis.init(this.cacheKey);
- that = this;
- return this;
- }
- /**
- * 获取行的key
- * @param args
- * @returns {string}
- * @private
- */
- Redis2MySql.prototype._getRowKey = function(args) {
- var key = this.cacheInfo['key'];
- var keys = key.split(',');
- var cacheKeys = [];
- for (var i in keys) {
- var key = keys[i].trim();
- if (args[key]) {
- if (util.isArray(args[key])) {
- cacheKeys.push(key + "=" + join('|', args[key]));
- } else {
- cacheKeys.push(key + "=" + args[key]);
- }
- }
- }
- var cacheKey = this.tableName + ":row";
- if (cacheKey) {
- return cacheKey + ':' + cacheKeys.join(':');
- } else {
- return cacheKey;
- }
- }
- /**
- * 获取getAll的key
- * @param args
- * @returns {string}
- * @private
- */
- Redis2MySql.prototype._getAllKey = function(args) {
- var cacheKey = this.tableName + ":all";
- var otherCacheKey = cacheKey + ":others";
- var key = this.cacheInfo['all_key'];
- if (key) {
- var keys = php.explode(',', key);
- var cacheKeys = [];
- for (var i in keys) {
- var key = keys[i].trim();
- if (util.isArray(args[key]) || typeof args[key] == 'undefined' || args[key] === null) {
- // 如果有数组,或者有key不存在,则归类到others
- return otherCacheKey;
- } else {
- cacheKeys.push(key + "=" + args[key]);
- }
- }
- if (cacheKeys) {
- return cacheKey + ':' + cacheKeys.join(':');
- } else {
- return otherCacheKey;
- }
- } else {
- return otherCacheKey;
- }
- }
- /**
- * 获取一个key的数据
- * @param where
- * @return <NULL, array>
- * @public
- */
- Redis2MySql.prototype.getRow = function getRow(where) {
- var cacheKey = this._getRowKey(where);
- var deferred = co(function*() {
- var data = yield that.objRedis.hgetall(cacheKey);
- if (php.empty(data)) {
- // 从数据库重建
- var row = yield that.objTable.getRow(where);
- if (!php.empty(row)) {
- // 设置缓存,无需等待成功就能返回
- that._setRowCache(row);
- }
- return row;
- } else {
- return data;
- }
- });
- return deferred;
- }
- /**
- * 读取多行数据
- * @param where
- * @param keyWord 查询关键字, array('_field', '_where', '_limit', '_sortKey', '_sortDir', '_lockRow', '_tableName')
- * @param bool updateList 是否强制更新缓存
- * @return array:
- * @public
- */
- Redis2MySql.prototype.getAll = function(where, keyWord, updateList) {
- var args = php.array_merge(where, keyWord);
- var key = php.http_build_query(args);
- if (key.length > 32) {
- key = php.md5(key);
- } else {
- key = php.http_build_query(args);
- }
- return co(function*() {
- var cacheKey = that._getAllKey(where) + ":" + key;
- if (!updateList) {
- var data = yield that.objRedis.get(cacheKey)
- if (data) {
- return JSON.parse(data);
- }
- }
- var data = yield that.objTable.getAll(where, keyWord);
- // 以下内容只是用来同步redis用的,可以异步跳过
- var pipeline = that.objRedis.pipeline();
- pipeline.set(cacheKey, JSON.stringify(data));
- if (that.cacheInfo['ttl'] > 0) {
- pipeline.expire(cacheKey, that.cacheInfo['ttl']);
- }
- pipeline.exec();
- return data;
- });
- }
- /**
- * 增加一行数据
- * @param args
- * @param boolean updateList
- * @return int
- * @public
- */
- Redis2MySql.prototype.addObject = function addObject(args, updateList) {
- updateList = updateList === false ? updateList : true;
- var deferred = co(function*() {
- var ret = yield that.objTable.addObject(args);
- if (updateList) {
- that.delListCache(args);
- }
- return ret;
- });
- return deferred;
- }
- /**
- * 删除列表的缓存
- * @param where
- * @public
- */
- Redis2MySql.prototype.delListCache = function(where) {
- var cacheKey = that._getAllKey(where);
- var otherKey = this.tableName + ":all:others";
- var deferred = co(function*() {
- var keys = [];
- if (cacheKey === otherKey) {
- // 需要清除所有key删除
- cacheKey = that.tableName + ":all";
- keys = yield that.objRedis.keys(cacheKey + '*');
- } else {
- // 除了删除当前keys,还需要删除others
- keys = yield that.objRedis.keys(cacheKey + '*');
- var keys1 = yield that.objRedis.keys(otherKey + '*');
- keys = php.array_merge(keys, keys1);
- }
- if (keys.length > 0) {
- that.objRedis.del(keys);
- }
- return 1;
- });
- return deferred;
- }
- /**
- * 设置行的缓存
- * @param args
- * @returns {Array|{index: number, input: string}|*|{arity, flags, keyStart, keyStop, step}}
- * @private
- */
- Redis2MySql.prototype._setRowCache = function(args) {
- var cacheKey = this._getRowKey(args);
- if (!cacheKey) {
- var msg = "没设置key:cacheKey," . JSON.stringify(this.cacheInfo);
- throw new Error(msg, CODE_REDIS_ERROR);
- }
- var pipeline = this.objRedis.pipeline();
- pipeline.hmset(cacheKey, args);
- if (this.cacheInfo['ttl'] > 0) {
- pipeline.expire(cacheKey, this.cacheInfo['ttl']);
- }
- return pipeline.exec();
- }
- /**
- * 更新行缓存
- * @param args
- * @returns {*}
- * @private
- */
- Redis2MySql.prototype._updateRowCache = function(args) {
- var cacheKey = this._getRowKey(args);
- if (!cacheKey) {
- var msg = "没设置key:cacheKey," . JSON.stringify(this.cacheInfo);
- throw new Error(msg, CODE_REDIS_ERROR);
- }
- var deferred = co(function *() {
- var flag = yield that.objRedis.exists(cacheKey);
- if (flag == 1) {
- var pipeline = that.objRedis.pipeline();
- pipeline.hmset(cacheKey, args);
- if (that.cacheInfo['ttl'] > 0) {
- pipeline.expire(cacheKey, that.cacheInfo['ttl']);
- }
- yield pipeline.exec();
- return 1;
- }
- });
- return deferred;
- }
- /**
- * 修改一个key的数据
- * @param args 更新的内容
- * @param where 更新的条件
- * @param boolean updateList
- * @return int 影响行数
- * @public
- */
- Redis2MySql.prototype.updateObject = function(args, where, updateList) {
- updateList = updateList === false ? updateList : true;
- var deferred = co(function*() {
- var result = yield that.objTable.updateObject(args, where);
- args = php.array_merge(args, where);
- if (result.affectedRows > 0) {
- yield that._updateRowCache(args);
- if (updateList) {
- that.delListCache(where);
- }
- }
- return result;
- });
- return deferred;
- }
- /**
- * 设置一个key的数据
- * @param args
- * @param boolean updateList
- * @return int 影响行数
- * @public
- */
- Redis2MySql.prototype.replaceObject = function(args, updateList) {
- var deferred = co(function*() {
- var result = yield that.objTable.replaceObject(args);
- yield that._setRowCache(args);
- if (updateList) {
- that.delListCache(args);
- }
- return result;
- });
- return deferred;
- }
- /**
- * 删除数据
- * @param array $where
- * @throws RedisException
- * @return unknown
- * @public
- */
- Redis2MySql.prototype.delObject = function(where, updateList) {
- updateList = updateList === false ? updateList : true;
- var deferred = co(function*() {
- var result = yield that.objTable.delObject(where);
- that._delRowCache(where);
- if (updateList) {
- that.delListCache(where);
- }
- return result;
- });
- return deferred;
- }
- /**
- * 删除行的缓存
- * @param where
- * @throws RedisException
- * @private
- */
- Redis2MySql.prototype._delRowCache = function(where) {
- var cacheKey = this._getRowKey(where);
- if (!cacheKey) {
- var msg = "没设置key:cacheKey," . JSON.stringify(this.cacheInfo);
- throw new Error(msg, CODE_REDIS_ERROR);
- }
- return this.objRedis.del(cacheKey);
- }
- /**
- * 关闭连接
- * @public
- */
- Redis2MySql.prototype.close = function() {
- if (this.objTable) {
- this.objTable.close();
- }
- if (this.objRedis) {
- this.objRedis.disconnect();
- }
- }
- module.exports = Redis2MySql;
|