Browse Source

调度逻辑

solupro 5 years ago
commit
5d34531ba7
6 changed files with 240 additions and 0 deletions
  1. 13 0
      bots.json
  2. 1 0
      eos.go
  3. 21 0
      main.go
  4. 99 0
      queue.go
  5. 87 0
      runner.go
  6. 19 0
      utils.go

+ 13 - 0
bots.json

@@ -0,0 +1,13 @@
+[
+  "bbbbb1111111",
+  "woainimama11",
+  "oppooppoopp1",
+  "youjumoijump",
+  "uziuziwinno1",
+  "paragonronl2",
+  "iqksosjhgyus",
+  "jjjjjjjjiiii",
+  "4tqibd3wmnzg",
+  "jvwh4d1ydfkw",
+  "5m2qzd3nz11u"
+]

+ 1 - 0
eos.go

@@ -0,0 +1 @@
+package main

+ 21 - 0
main.go

@@ -0,0 +1,21 @@
+package main
+
+import (
+	"encoding/json"
+	"flag"
+	"io/ioutil"
+)
+
+func main() {
+	var configPath string
+	flag.StringVar(&configPath, "conf", "./bots.json", "bots config path")
+	flag.Parse()
+
+	f, err := ioutil.ReadFile(configPath)
+	CheckError(err)
+	bots := []string{}
+	err = json.Unmarshal(f, &bots)
+	CheckError(err)
+
+	Run(bots)
+}

+ 99 - 0
queue.go

@@ -0,0 +1,99 @@
+package main
+
+import "sync"
+
+type queue struct {
+	buf    []interface{}
+	length int
+	mux    *sync.RWMutex
+}
+
+func newQueue() *queue {
+	return &queue{make([]interface{}, 0), 0, &sync.RWMutex{}}
+}
+
+func (q *queue) rPush(e interface{}) {
+	q.mux.Lock()
+	defer q.mux.Unlock()
+
+	q.buf = append(q.buf, e)
+	q.length += 1
+
+}
+
+func (q *queue) rPushs(elements ...interface{}) {
+	q.mux.Lock()
+	defer q.mux.Unlock()
+
+	q.buf = append(q.buf, elements...)
+	q.length += len(elements)
+}
+
+func (q *queue) lPush(e interface{}) {
+	q.mux.Lock()
+	defer q.mux.Unlock()
+
+	q.buf = append([]interface{}{e}, q.buf...)
+	q.length += 1
+}
+
+func (q *queue) rPop() interface{} {
+	q.mux.Lock()
+
+	if q.length == 0 {
+		return nil
+	}
+
+	n := q.buf[q.length-1]
+
+	q.buf = q.buf[:q.length-1]
+	q.length -= 1
+
+	q.mux.Unlock()
+	return n
+}
+
+func (q *queue) lPop() interface{} {
+	q.mux.Lock()
+
+	if q.length == 0 {
+		return nil
+	}
+
+	n := q.buf[0]
+
+	q.buf = q.buf[1:]
+	q.length -= 1
+
+	q.mux.Unlock()
+	return n
+}
+
+func (q *queue) lGet() interface{} {
+	q.mux.RLock()
+	defer q.mux.RUnlock()
+
+	if q.length == 0 {
+		return nil
+	}
+
+	return q.buf[0]
+}
+
+func (q *queue) rGet() interface{} {
+	q.mux.RLock()
+	defer q.mux.RUnlock()
+
+	if q.length == 0 {
+		return nil
+	}
+
+	return q.buf[q.length-1]
+}
+
+func (q *queue) getLen() int {
+	q.mux.RLock()
+	defer q.mux.RUnlock()
+
+	return q.length
+}

+ 87 - 0
runner.go

@@ -0,0 +1,87 @@
+package main
+
+import (
+	"github.com/sirupsen/logrus"
+	"math/rand"
+	"sync/atomic"
+	"time"
+)
+
+const (
+	MAX_RUN    = 3
+	MIN_RUN    = 1
+	TIME_CHECK = 10 * time.Second // 没十分钟检查一次
+)
+
+var BotQueue *queue
+var Working int64
+
+func init() {
+	BotQueue = newQueue()
+}
+
+func Run(bots []string) {
+	StringShuffle(bots)
+
+	for _, bot := range bots {
+		BotQueue.rPush(bot)
+	}
+	logrus.Info("bots queue init")
+
+	ticker := time.NewTicker(TIME_CHECK)
+	defer ticker.Stop()
+
+	botCh := make(chan string)
+	master(botCh)
+
+	for {
+		select {
+		case <-ticker.C:
+			master(botCh)
+
+		case exitBot := <-botCh:
+			logrus.Warnf("bot: %s exit!", exitBot)
+			atomic.AddInt64(&Working, -1)
+			BotQueue.rPush(exitBot) // 退出机器人重新加到队尾
+
+		}
+	}
+}
+
+func master(botCh chan<- string) {
+	logrus.Info("check running worker")
+
+	running := atomic.LoadInt64(&Working)
+	logrus.Infof("running %d bots, queue len: %d", running, BotQueue.getLen())
+
+	if running <= MIN_RUN {
+		need := MAX_RUN - running
+
+		var i int64
+		for ; i < need; i++ {
+			bot := BotQueue.lPop()
+			if bot == nil {
+				logrus.Error("bot queue empty!!!")
+				break
+			}
+			go worker(bot.(string), botCh)
+		}
+	}
+}
+
+func worker(bot string, botCh chan<- string) {
+	logrus.Infof("bot: %s running~", bot)
+	atomic.AddInt64(&Working, 1)
+	for {
+		rand.Seed(time.Now().UnixNano())
+		c := rand.Intn(5)
+
+		logrus.Infof("bot: %s calc %d", bot, c)
+		if c == 3 {
+			break
+		}
+		time.Sleep(2 * time.Second)
+	}
+
+	botCh <- bot
+}

+ 19 - 0
utils.go

@@ -0,0 +1,19 @@
+package main
+
+import (
+	"math/rand"
+	"time"
+)
+
+func StringShuffle(list []string) {
+	rand.Seed(time.Now().UnixNano())
+	rand.Shuffle(len(list), func(i, j int) {
+		list[i], list[j] = list[j], list[i]
+	})
+}
+
+func CheckError(err error) {
+	if err != nil {
+		panic(err)
+	}
+}