message_model.dart 4.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175
  1. import 'dart:async';
  2. import 'dart:convert';
  3. import 'package:flutter/cupertino.dart';
  4. import 'package:shared_preferences/shared_preferences.dart';
  5. import 'package:sport/bean/message.dart';
  6. import 'package:sport/db/message_db.dart';
  7. import 'package:sport/services/api/inject_api.dart';
  8. class MessageModel extends ChangeNotifier with InjectApi {
  9. // Message _message;
  10. int _curId;
  11. List<MessageInstance> _messages;
  12. AppLifecycleState state;
  13. final StreamController<List<MessageInstance>> _queryController =
  14. StreamController.broadcast();
  15. Stream<List<MessageInstance>> get queryStream => _queryController.stream;
  16. ///事件订阅对象
  17. StreamSubscription dataSubscription;
  18. StreamSubscription periodicSubscription;
  19. init() {
  20. periodicSubscription =
  21. Stream.periodic(Duration(seconds: 10)).listen((event) async {
  22. if(state == AppLifecycleState.paused)
  23. return;
  24. Message data;
  25. SharedPreferences prefs = await SharedPreferences.getInstance();
  26. String token = prefs.getString("token");
  27. if(token == null || token.isEmpty == true)
  28. return;
  29. if (_curId != null) {
  30. data = (await api.getMessageForPoll(curId: _curId)).data;
  31. } else {
  32. data = (await api.getMessageForPoll()).data;
  33. }
  34. // curId 这里一定是接口返回的 本地存 sqlite的 curId 全部都是 0
  35. _curId = data.curId;
  36. _messages = data.messages;
  37. // 没有新消息还处理流程吗?
  38. if(!(data?.messages?.isNotEmpty == true))
  39. return
  40. // var list = await MessageDB().findAll(); // 清空数字
  41. // print("[list]:$list--------------------------");
  42. // var userList = await MessageDB().findAllUser();
  43. // print("[list]:$userList--------------------------");
  44. // MessageDB().deleteTable(); //
  45. // 这里本来是做了去重操作的 现在不用了 ...
  46. // if (_messages.length > 0) {
  47. // var list = await MessageDB().findLatest();
  48. // print(list);
  49. // if (list.length != 0) {
  50. // int lastCurId = list[0]["curId"];
  51. // print('[lastCurId]:$lastCurId-------------------------');
  52. // // 可能会重复插入...;
  53. // if (lastCurId != _curId) {
  54. // add(data.messages, _curId);
  55. // }
  56. // } else {
  57. //// add(data.messages, _curId);
  58. // }
  59. // }
  60. add(data.messages, _curId);
  61. });
  62. }
  63. add(List<MessageInstance> messages, int curId) async {
  64. List<MessageItem> _items = [];
  65. // 插进去是 1 未读... 0 已读...
  66. for (MessageInstance _instance in _messages) {
  67. _items.add(new MessageItem(
  68. message: _instance,
  69. curId: curId,
  70. status: 1,
  71. userId: _instance.fromUser.id));
  72. var list = await MessageDB().findHasUserId(_instance.fromUser.id);
  73. // print("[user:]$list-------------------------------");
  74. //
  75. // printWell("list", list);
  76. if (list.length == 0) {
  77. await MessageDB().insertUser(
  78. new UserTableInfo(userId: _instance.fromUser.id, isTop: 0));
  79. }
  80. }
  81. await MessageDB().insertAll(_items);
  82. // 这里就是 广播?
  83. get(messages);
  84. // print("[items]:$_items----------------------------------------------");
  85. }
  86. get(List<MessageInstance> list) {
  87. _queryController.add(list);
  88. }
  89. close() {
  90. ///关闭
  91. dataSubscription.cancel();
  92. // _queryController.close();
  93. }
  94. }
  95. // Generic Interface for all BLoCs
  96. abstract class BlocBase {
  97. void dispose();
  98. }
  99. // Generic BLoC provider
  100. class BlocProvider<T extends BlocBase> extends StatefulWidget {
  101. BlocProvider({
  102. Key key,
  103. @required this.child,
  104. @required this.bloc,
  105. }) : super(key: key);
  106. final T bloc;
  107. final Widget child;
  108. @override
  109. _BlocProviderState<T> createState() => _BlocProviderState<T>();
  110. static T of<T extends BlocBase>(BuildContext context) {
  111. final type = _typeOf<BlocProvider<T>>();
  112. BlocProvider<T> provider = context.ancestorWidgetOfExactType(type);
  113. return provider.bloc;
  114. }
  115. static Type _typeOf<T>() => T;
  116. }
  117. class _BlocProviderState<T> extends State<BlocProvider<BlocBase>> {
  118. @override
  119. void dispose() {
  120. widget.bloc.dispose();
  121. super.dispose();
  122. }
  123. @override
  124. Widget build(BuildContext context) {
  125. return widget.child;
  126. }
  127. }
  128. class MessageBloc implements BlocBase {
  129. Message _message;
  130. //
  131. // Stream to handle the counter
  132. //
  133. StreamController<Message> _counterController = StreamController<Message>();
  134. // 流的输入...
  135. StreamSink<Message> get _addOperate => _counterController.sink;
  136. Stream<Message> get outCounter => _counterController.stream;
  137. void dispose() {
  138. _counterController.close();
  139. }
  140. // 这是操作?
  141. void addOperate(data) {
  142. _addOperate.add(_message);
  143. }
  144. }