Base.php 13 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444
  1. <?php
  2. /**
  3. * 同布表格数据基类
  4. * User: ben
  5. */
  6. class Sync_Base extends Singleton {
  7. protected $objRedis;
  8. protected $objTable;
  9. /**
  10. * 表名
  11. * @var string
  12. */
  13. protected $code;
  14. /**
  15. * 表名
  16. * @var string
  17. */
  18. protected $tableName;
  19. /**
  20. * 合约表名,默认跟tableName一样
  21. * @var string
  22. */
  23. protected $codeTable;
  24. /**
  25. * 缓存的key
  26. * @var string
  27. */
  28. protected $cacheKey = 'default';
  29. /**
  30. * 数据库key
  31. * @var string
  32. */
  33. protected $dbKey = 'default';
  34. /**
  35. * 主键的字段名
  36. * @var string
  37. */
  38. protected $priKey;
  39. /**
  40. * 关键数据字段(无需传主键)
  41. * @var array
  42. */
  43. protected $dataField = [];
  44. /**
  45. * 索引的字段名
  46. * @var string
  47. */
  48. protected $indexName;
  49. /**
  50. * 索引,Index number, 1 - primary (first), 2 - secondary index (in order defined by multi_index), 3 - third index, etc.
  51. * @var int
  52. */
  53. protected $index = 1;
  54. /**
  55. * 索引类型,The key type of --index, primary only supports (i64), all others support (i64, i128, i256, float64, float128, ripemd160, sha256).
  56. * @var string
  57. */
  58. protected $keyType = 'i64';
  59. /**
  60. * 频率,几秒同步一次
  61. * @var int
  62. */
  63. protected $frequency = 10;
  64. /**
  65. * 数据清除间隔
  66. * @var int
  67. */
  68. protected $clearTtl = -1;
  69. /**
  70. * 清除数据的动作名
  71. * @var null
  72. */
  73. protected $clearAction = null;
  74. /**
  75. * 执行最大次数
  76. * @var int
  77. */
  78. protected $maxTimes = 1000;
  79. public function __construct($tableName, $priKey = 'id', $indexName = 'id', $codeTable = null, $code = null) {
  80. $this->tableName = $tableName;
  81. $this->codeTable = $codeTable ?: $tableName;
  82. $this->priKey = $priKey;
  83. $this->indexName = $indexName;
  84. $this->code = $code ?: $GLOBALS['codeMee'];
  85. $this->objRedis = dwRedis::init($this->cacheKey);
  86. $this->objTable = new TableHelper($this->tableName, $this->dbKey);
  87. }
  88. private static function getPreKey($tableName) {
  89. return "globals:syncEosTable:{$tableName}:";
  90. }
  91. private static function _getClearKey($tableName) {
  92. return "globals:clearEosTable:{$tableName}";
  93. }
  94. protected static function getRequestKey($tableName) {
  95. return self::getPreKey($tableName) . 'request';
  96. }
  97. protected static function getResponseKey($tableName) {
  98. return self::getPreKey($tableName) . 'response';
  99. }
  100. public function isTimeout($time) {
  101. $responseKey = self::getResponseKey($this->tableName);
  102. $runTime = $this->objRedis->get($responseKey);
  103. $span = $time - $runTime;
  104. if ($span >= $this->frequency) {
  105. $msg = "Sync {$this->code}.{$this->codeTable} => {$this->dbKey}.{$this->tableName} timeout, time:{$time}, runTime:{$runTime}, span:{$span} > frequency:{$this->frequency}";
  106. Eos::log($msg);
  107. if ($runTime && $span > $this->frequency * 5 && $span > 10) {
  108. alermErrorMsg($msg);
  109. }
  110. return true;
  111. } else {
  112. return false;
  113. }
  114. }
  115. public function pubSubscribe($waitNewer = true, $all = false) {
  116. $time = microtime(true);
  117. $requestKey = self::getRequestKey($this->tableName);
  118. $this->objRedis->setex($requestKey, $this->frequency, $time);
  119. // Eos::log("time:{$time}");
  120. Eos::pubEvent($requestKey, compact('time', 'all'));
  121. if ($waitNewer) {
  122. // 最多等10秒
  123. for ($i = 0; $i < 100; $i++) {
  124. $responseKey = self::getResponseKey($this->tableName);
  125. $runTime = $this->objRedis->get($responseKey);
  126. if ($runTime > $time) {
  127. // Eos::log("runTime:{$runTime} > time:{$time}");
  128. return true;
  129. } else {
  130. // Eos::log("sleep, runTime:{$runTime} < time:{$time}");
  131. // 休息 100ms
  132. usleep(100 * 1000);
  133. }
  134. }
  135. }
  136. return false;
  137. }
  138. public function getNewestDatas($lower = null, $limit = 1000) {
  139. $cmd = "cleos -u {$GLOBALS['eosUrl']} get table {$this->code} {$this->code} {$this->codeTable} --index {$this->index} --key-type {$this->keyType} -l {$limit} ";
  140. if ($lower) {
  141. $cmd .= " -L '{$lower}' ";
  142. }
  143. $json = Eos::execCmd($cmd);
  144. $datas = json_decode($json, true);
  145. Eos::log("get {$this->code} {$this->codeTable} lower:{$lower}, count:" . count($datas['rows']));
  146. return $datas;
  147. }
  148. protected function updateRow($row, $priValue, $dbRow) {
  149. if ($this->dataField) {
  150. $newRow = arrayFilter($row, $this->dataField);
  151. $needUpdate = false;
  152. foreach ($newRow as $k => $v) {
  153. if ($dbRow[$k] != $v) {
  154. $needUpdate = true;
  155. break;
  156. }
  157. }
  158. // 不需要更新,则跳过
  159. if (!$needUpdate) {
  160. // Eos::log("skip same row, priKey:{$dbRow[$this->priKey]}, dbRow:" . json_encode($dbRow));
  161. return;
  162. }
  163. }
  164. $this->objTable->updateObject($row, [$this->priKey => $priValue]);
  165. Eos::log("update {$this->tableName}, dbRow:" . json_encode($dbRow));
  166. Eos::log("update {$this->tableName}, new row:" . json_encode($row));
  167. }
  168. protected function addNewRows($newRows) {
  169. // 批量添加
  170. Eos::log("addObjectsIfNoExist {$this->tableName} count:" . count($newRows));
  171. foreach ($newRows as $newRow) {
  172. Eos::log("addNewRow {$this->tableName}: new row:" . json_encode($newRow));
  173. }
  174. $this->objTable->addObjectsIfNoExist($newRows);
  175. }
  176. protected function tryClearData() {
  177. if ($this->clearTtl > 0) {
  178. $key = $this->_getClearKey($this->tableName);
  179. $flag = $this->objRedis->exists($key);
  180. Eos::log("exists ClearKey:{$flag}");
  181. if (!$flag) {
  182. $flag2 = $this->clearData();
  183. if ($flag2) {
  184. $this->objRedis->setex($key, $this->clearTtl, time());
  185. }
  186. }
  187. }
  188. }
  189. /**
  190. * 清除数据
  191. */
  192. protected function clearData() {
  193. if ($this->clearAction) {
  194. // 取倒数第20条
  195. $_sortKey = "update_time_int DESC";
  196. $last = $this->objTable->getRow(compact('_sortKey'));
  197. $update_time_int = $last['update_time_int'];
  198. $maxUpdateTime = 1000000 * (time() - 180);
  199. $update_time_int = max($update_time_int, $maxUpdateTime);
  200. // 清除数据
  201. $cmd = "cleos -u {$GLOBALS['eosUrl']} push action {$this->code} {$this->clearAction} '[ \"{$update_time_int}\" ]' -j -p {$this->code}";
  202. $json = Eos::execCmd($cmd);
  203. $ret = json_decode($json, true);
  204. Eos::log($cmd);
  205. Eos::log("transaction_id: {$ret['transaction_id']}");
  206. return $ret['transaction_id'];
  207. } else {
  208. throw new Exception("{$this->tableName} have not clearAction");
  209. }
  210. }
  211. /**
  212. * 更新数据库,返回新增数据 和 更新的数据
  213. *
  214. * @param $rows
  215. *
  216. * @return array
  217. */
  218. protected function updateDb($rows) {
  219. if (!$rows) {
  220. return [];
  221. }
  222. $_field = "`{$this->priKey}`";
  223. if ($this->dataField) {
  224. $_field .= ", `" . join("`, `", $this->dataField) . "`";
  225. }
  226. // 考虑到有可能会丢数据,所以不能只拿合约上的数据
  227. $ids = array_column($rows, $this->priKey);
  228. // $existMap = $this->objTable->getAll(, compact('_field'));
  229. // 获取线上时间最小的时间
  230. $maxTime = time() * 1000000;
  231. foreach ($rows as $row) {
  232. $maxTime = max($row[$this->indexName], $maxTime);
  233. }
  234. $_where = "`{$this->indexName}` >= '{$maxTime}' ";
  235. if ($ids) {
  236. $_where .= " OR `{$this->priKey}` IN ('" . join("', '", $ids) . "') ";
  237. }
  238. $existMap = $this->objTable->getAll(compact('_field', '_where'));
  239. $existMap = arrayFormatKey($existMap, $this->priKey);
  240. $existMap2 = $existMap;
  241. $newDatas = [];
  242. foreach ($rows as $row) {
  243. $id = $row[$this->priKey];
  244. $dbRow = $existMap[$id];
  245. if ($dbRow) {
  246. // 匹配到,则删除
  247. unset($existMap2[$id]);
  248. // 去掉主键
  249. unset($row[$this->priKey]);
  250. $this->updateRow($row, $id, $dbRow);
  251. } else {
  252. $newDatas[] = $row;
  253. }
  254. }
  255. if ($newDatas) {
  256. // 转化回数组
  257. $this->addNewRows($newDatas);
  258. }
  259. if ($existMap2) {
  260. $objLostLog = new TableHelper('lost_log', 'dw_eos');
  261. // 要删除的多余数据...,这个是因为交易丢失造成的脏数据
  262. $lostLogs = [];
  263. $lostIds = [];
  264. $timeout = (time() - 15) * 1000000;
  265. foreach ($existMap2 as $id => $lostRow) {
  266. // 超过15秒的数据,还出现丢失,说明有问题
  267. if ($lostRow['update_time_int'] < $timeout) {
  268. $lostLogs[] = [
  269. 'db_name' => $this->dbKey,
  270. 'table_name' => $this->tableName,
  271. 'pri_key' => $id,
  272. 'data' => json_encode($lostRow),
  273. ];
  274. $lostIds[] = $id;
  275. }
  276. }
  277. if ($lostIds) {
  278. // 删除数据,并记录log
  279. $objLostLog->addObjects2($lostLogs);
  280. // 先注释删除
  281. // $this->objTable->delObject([$this->priKey => $lostIds]);
  282. // 来个告警
  283. alermErrorMsg("同步{$this->tableName}, 发现脏数据, id:" . join(',', $lostIds));
  284. }
  285. }
  286. // 添加数据的时候,尝试删除老数据
  287. $this->tryClearData();
  288. return compact('existMap', 'newDatas');
  289. }
  290. /**
  291. * 同步一次
  292. */
  293. public function syncOnce($all = false) {
  294. if ($all) {
  295. $maxIndex = null;
  296. } else {
  297. // 注意,这里最新的数据可能是不靠谱的,需要过了3分钟的不可逆的数据才靠谱
  298. $maxIndex = $this->objTable->getOne([
  299. '_field' => "`{$this->indexName}`",
  300. '_sortKey' => "`{$this->indexName}` DESC"
  301. ]);
  302. // 获取3分钟内的数据
  303. $maxIndex = min($maxIndex, (time() - 180) * 1000000);
  304. }
  305. $ret = $this->getNewestDatas($maxIndex);
  306. $this->updateDb($ret['rows']);
  307. return $ret;
  308. }
  309. /**
  310. * 守卫进程,定时同步
  311. */
  312. public function syncDaemon() {
  313. $this->sync();
  314. // 订阅事件
  315. $requestKey = self::getRequestKey($this->tableName);
  316. try {
  317. $objSubRedis = dwRedis::initNewOne($this->cacheKey);
  318. $objSubRedis->setOption(Redis::OPT_READ_TIMEOUT, $this->frequency * $this->maxTimes);
  319. $objSubRedis->subscribe([$requestKey], function($objRedis, $channel, $json) use($requestKey) {
  320. $data = json_decode($json, true);
  321. $max = (int) ($this->objRedis->get($requestKey) * 1000);
  322. $time = (int) ($data['time'] * 1000);
  323. if ($time >= $max) {
  324. // Eos::log("subscribe channel: {$channel}, all:{$data['all']}, msg: {$data['time']} >= max: {$max}");
  325. $this->sync($data['all']);
  326. } else {
  327. Eos::log("skip, msg:{$data['time']} < max:{$max}.");
  328. }
  329. });
  330. } catch (RedisException $ex) {
  331. Eos::log("timeout, ex:" . $ex->getMessage());
  332. }
  333. }
  334. /**
  335. * 守卫进程,定时同步
  336. */
  337. private function sync($all = false, $times = 0) {
  338. if ($times++ > $this->maxTimes) {
  339. return;
  340. }
  341. $runTime = microtime(true);
  342. $this->syncOnce($all);
  343. $responseKey = self::getResponseKey($this->tableName);
  344. $this->objRedis->setex($responseKey, $this->frequency * 10, $runTime);
  345. $requestKey = self::getRequestKey($this->tableName);
  346. $requestTime = $this->objRedis->get($requestKey);
  347. if ($requestTime >= $runTime) {
  348. Eos::log("find requestTime: {$requestTime}, syncDaemon at once.");
  349. $clientNum = Eos::pubEvent($responseKey, $runTime);
  350. Eos::log("pubEvent: {$responseKey}, clientNum: {$clientNum}");
  351. $this->afterSync();
  352. $this->sync($all, $times);
  353. } else {
  354. $this->afterSync();
  355. }
  356. $msg = 'sync done, memory used: ' . memory_get_usage() . ' bytes';
  357. CallLog::logSelfCall(CODE_SUCCESS, $msg);
  358. // 记录日志后,重置时间,才能统计每次循环所用的时间
  359. global $startTime;
  360. $startTime = microtime(true);
  361. }
  362. protected function afterSync() {
  363. }
  364. protected function formatDateTime($str) {
  365. if (strpos($str, 'T') > 0) {
  366. return date('Y-m-d H:i:s', strtotime($str) + 8 * 3600);
  367. } else {
  368. return date('Y-m-d H:i:s', strtotime($str));
  369. }
  370. }
  371. }