Redis2MySql.js 9.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387
  1. var Q = require('q');
  2. var oujRedis = require('./oujRedis.js');
  3. var TableHelper = require('./TableHelper.js');
  4. var r2m_configs = require('../../conf/r2m_config.inc.js');
  5. var util = require('util');
  6. var php = require('phpjs');
  7. /**
  8. * @type {co.co|exports|module.exports}
  9. */
  10. var co = require('co');
  11. /**
  12. * @type {Redis2MySql}
  13. */
  14. var that;
  15. /**
  16. * 基本类,提供增删改查
  17. * @param {string} dbKey
  18. * @author benzhan
  19. */
  20. function Redis2MySql(tableName, dbKey, cacheKey) {
  21. this.tableName = tableName;
  22. this.dbKey = dbKey || 'default';
  23. this.cacheKey = cacheKey || 'default';
  24. this.cacheInfo = r2m_configs[this.dbKey][tableName];
  25. if (!this.cacheInfo) {
  26. throw new Error("redis没配置table name:" + tableName, CODE_REDIS_ERROR);
  27. }
  28. /**
  29. * @type {TableHelper}
  30. */
  31. this.objTable = new TableHelper(tableName, this.dbKey);
  32. /**
  33. * @type {Redis}
  34. */
  35. this.objRedis = oujRedis.init(this.cacheKey);
  36. that = this;
  37. return this;
  38. }
  39. /**
  40. * 获取行的key
  41. * @param args
  42. * @returns {string}
  43. * @private
  44. */
  45. Redis2MySql.prototype._getRowKey = function(args) {
  46. var key = this.cacheInfo['key'];
  47. var keys = key.split(',');
  48. var cacheKeys = [];
  49. for (var i in keys) {
  50. var key = keys[i].trim();
  51. if (args[key]) {
  52. if (util.isArray(args[key])) {
  53. cacheKeys.push(key + "=" + join('|', args[key]));
  54. } else {
  55. cacheKeys.push(key + "=" + args[key]);
  56. }
  57. }
  58. }
  59. var cacheKey = this.tableName + ":row";
  60. if (cacheKey) {
  61. return cacheKey + ':' + cacheKeys.join(':');
  62. } else {
  63. return cacheKey;
  64. }
  65. }
  66. /**
  67. * 获取getAll的key
  68. * @param args
  69. * @returns {string}
  70. * @private
  71. */
  72. Redis2MySql.prototype._getAllKey = function(args) {
  73. var cacheKey = this.tableName + ":all";
  74. var otherCacheKey = cacheKey + ":others";
  75. var key = this.cacheInfo['all_key'];
  76. if (key) {
  77. var keys = php.explode(',', key);
  78. var cacheKeys = [];
  79. for (var i in keys) {
  80. var key = keys[i].trim();
  81. if (util.isArray(args[key]) || typeof args[key] == 'undefined' || args[key] === null) {
  82. // 如果有数组,或者有key不存在,则归类到others
  83. return otherCacheKey;
  84. } else {
  85. cacheKeys.push(key + "=" + args[key]);
  86. }
  87. }
  88. if (cacheKeys) {
  89. return cacheKey + ':' + cacheKeys.join(':');
  90. } else {
  91. return otherCacheKey;
  92. }
  93. } else {
  94. return otherCacheKey;
  95. }
  96. }
  97. /**
  98. * 获取一个key的数据
  99. * @param where
  100. * @return <NULL, array>
  101. * @public
  102. */
  103. Redis2MySql.prototype.getRow = function getRow(where) {
  104. var cacheKey = this._getRowKey(where);
  105. var deferred = co(function*() {
  106. var data = yield that.objRedis.hgetall(cacheKey);
  107. if (php.empty(data)) {
  108. // 从数据库重建
  109. var row = yield that.objTable.getRow(where);
  110. if (!php.empty(row)) {
  111. // 设置缓存,无需等待成功就能返回
  112. that._setRowCache(row);
  113. }
  114. return row;
  115. } else {
  116. return data;
  117. }
  118. });
  119. return deferred;
  120. }
  121. /**
  122. * 读取多行数据
  123. * @param where
  124. * @param keyWord 查询关键字, array('_field', '_where', '_limit', '_sortKey', '_sortDir', '_lockRow', '_tableName')
  125. * @param bool updateList 是否强制更新缓存
  126. * @return array:
  127. * @public
  128. */
  129. Redis2MySql.prototype.getAll = function(where, keyWord, updateList) {
  130. var args = php.array_merge(where, keyWord);
  131. var key = php.http_build_query(args);
  132. if (key.length > 32) {
  133. key = php.md5(key);
  134. } else {
  135. key = php.http_build_query(args);
  136. }
  137. return co(function*() {
  138. var cacheKey = that._getAllKey(where) + ":" + key;
  139. if (!updateList) {
  140. var data = yield that.objRedis.get(cacheKey)
  141. if (data) {
  142. return JSON.parse(data);
  143. }
  144. }
  145. var data = yield that.objTable.getAll(where, keyWord);
  146. // 以下内容只是用来同步redis用的,可以异步跳过
  147. var pipeline = that.objRedis.pipeline();
  148. pipeline.set(cacheKey, JSON.stringify(data));
  149. if (that.cacheInfo['ttl'] > 0) {
  150. pipeline.expire(cacheKey, that.cacheInfo['ttl']);
  151. }
  152. pipeline.exec();
  153. return data;
  154. });
  155. }
  156. /**
  157. * 增加一行数据
  158. * @param args
  159. * @param boolean updateList
  160. * @return int
  161. * @public
  162. */
  163. Redis2MySql.prototype.addObject = function addObject(args, updateList) {
  164. updateList = updateList === false ? updateList : true;
  165. var deferred = co(function*() {
  166. var ret = yield that.objTable.addObject(args);
  167. if (updateList) {
  168. that.delListCache(args);
  169. }
  170. return ret;
  171. });
  172. return deferred;
  173. }
  174. /**
  175. * 删除列表的缓存
  176. * @param where
  177. * @public
  178. */
  179. Redis2MySql.prototype.delListCache = function(where) {
  180. var cacheKey = that._getAllKey(where);
  181. var otherKey = this.tableName + ":all:others";
  182. var deferred = co(function*() {
  183. var keys = [];
  184. if (cacheKey === otherKey) {
  185. // 需要清除所有key删除
  186. cacheKey = that.tableName + ":all";
  187. keys = yield that.objRedis.keys(cacheKey + '*');
  188. } else {
  189. // 除了删除当前keys,还需要删除others
  190. keys = yield that.objRedis.keys(cacheKey + '*');
  191. var keys1 = yield that.objRedis.keys(otherKey + '*');
  192. keys = php.array_merge(keys, keys1);
  193. }
  194. if (keys.length > 0) {
  195. that.objRedis.del(keys);
  196. }
  197. return 1;
  198. });
  199. return deferred;
  200. }
  201. /**
  202. * 设置行的缓存
  203. * @param args
  204. * @returns {Array|{index: number, input: string}|*|{arity, flags, keyStart, keyStop, step}}
  205. * @private
  206. */
  207. Redis2MySql.prototype._setRowCache = function(args) {
  208. var cacheKey = this._getRowKey(args);
  209. if (!cacheKey) {
  210. var msg = "没设置key:cacheKey," . JSON.stringify(this.cacheInfo);
  211. throw new Error(msg, CODE_REDIS_ERROR);
  212. }
  213. var pipeline = this.objRedis.pipeline();
  214. pipeline.hmset(cacheKey, args);
  215. if (this.cacheInfo['ttl'] > 0) {
  216. pipeline.expire(cacheKey, this.cacheInfo['ttl']);
  217. }
  218. return pipeline.exec();
  219. }
  220. /**
  221. * 更新行缓存
  222. * @param args
  223. * @returns {*}
  224. * @private
  225. */
  226. Redis2MySql.prototype._updateRowCache = function(args) {
  227. var cacheKey = this._getRowKey(args);
  228. if (!cacheKey) {
  229. var msg = "没设置key:cacheKey," . JSON.stringify(this.cacheInfo);
  230. throw new Error(msg, CODE_REDIS_ERROR);
  231. }
  232. var deferred = co(function *() {
  233. var flag = yield that.objRedis.exists(cacheKey);
  234. if (flag == 1) {
  235. var pipeline = that.objRedis.pipeline();
  236. pipeline.hmset(cacheKey, args);
  237. if (that.cacheInfo['ttl'] > 0) {
  238. pipeline.expire(cacheKey, that.cacheInfo['ttl']);
  239. }
  240. yield pipeline.exec();
  241. return 1;
  242. }
  243. });
  244. return deferred;
  245. }
  246. /**
  247. * 修改一个key的数据
  248. * @param args 更新的内容
  249. * @param where 更新的条件
  250. * @param boolean updateList
  251. * @return int 影响行数
  252. * @public
  253. */
  254. Redis2MySql.prototype.updateObject = function(args, where, updateList) {
  255. updateList = updateList === false ? updateList : true;
  256. var deferred = co(function*() {
  257. var result = yield that.objTable.updateObject(args, where);
  258. args = php.array_merge(args, where);
  259. if (result.affectedRows > 0) {
  260. yield that._updateRowCache(args);
  261. if (updateList) {
  262. that.delListCache(where);
  263. }
  264. }
  265. return result;
  266. });
  267. return deferred;
  268. }
  269. /**
  270. * 设置一个key的数据
  271. * @param args
  272. * @param boolean updateList
  273. * @return int 影响行数
  274. * @public
  275. */
  276. Redis2MySql.prototype.replaceObject = function(args, updateList) {
  277. var deferred = co(function*() {
  278. var result = yield that.objTable.replaceObject(args);
  279. yield that._setRowCache(args);
  280. if (updateList) {
  281. that.delListCache(args);
  282. }
  283. return result;
  284. });
  285. return deferred;
  286. }
  287. /**
  288. * 删除数据
  289. * @param array $where
  290. * @throws RedisException
  291. * @return unknown
  292. * @public
  293. */
  294. Redis2MySql.prototype.delObject = function(where, updateList) {
  295. updateList = updateList === false ? updateList : true;
  296. var deferred = co(function*() {
  297. var result = yield that.objTable.delObject(where);
  298. that._delRowCache(where);
  299. if (updateList) {
  300. that.delListCache(where);
  301. }
  302. return result;
  303. });
  304. return deferred;
  305. }
  306. /**
  307. * 删除行的缓存
  308. * @param where
  309. * @throws RedisException
  310. * @private
  311. */
  312. Redis2MySql.prototype._delRowCache = function(where) {
  313. var cacheKey = this._getRowKey(where);
  314. if (!cacheKey) {
  315. var msg = "没设置key:cacheKey," . JSON.stringify(this.cacheInfo);
  316. throw new Error(msg, CODE_REDIS_ERROR);
  317. }
  318. return this.objRedis.del(cacheKey);
  319. }
  320. /**
  321. * 关闭连接
  322. * @public
  323. */
  324. Redis2MySql.prototype.close = function() {
  325. if (this.objTable) {
  326. this.objTable.close();
  327. }
  328. if (this.objRedis) {
  329. this.objRedis.disconnect();
  330. }
  331. }
  332. module.exports = Redis2MySql;