diff --git a/server/src/core/core.go b/server/src/core/core.go index 82bfac3..a0e7000 100644 --- a/server/src/core/core.go +++ b/server/src/core/core.go @@ -4,8 +4,9 @@ import "sync" // Core ... type Core struct { - pool map[[16]byte]*row - mux sync.Mutex + serial int + pool map[[16]byte]*row + mux sync.Mutex } // NewCore ... diff --git a/server/src/core/fetch.go b/server/src/core/fetch.go index ea0cac3..3524835 100644 --- a/server/src/core/fetch.go +++ b/server/src/core/fetch.go @@ -11,14 +11,14 @@ import ( "time" ) -func (pr *row) fetchRemote() (ab []byte, ok bool, err error) { +func (pr *row) fetchRemote() (ab []byte, err error) { r := pr.req b := pr.log u, err := url.Parse(config.OpenAIBase + r.Url) if err != nil { - return nil, false, err + return nil, err } // zj.J(`real url`, u.String()) @@ -39,9 +39,7 @@ func (pr *row) fetchRemote() (ab []byte, ok bool, err error) { return } - if rsp.StatusCode >= 200 || rsp.StatusCode < 300 { - ok = true - } else { + if rsp.StatusCode < 200 || rsp.StatusCode >= 300 { err = fmt.Errorf(`status code fail: %d`, rsp.StatusCode) b.WriteString(err.Error()) } diff --git a/server/src/core/pool.go b/server/src/core/pool.go index 6c3eaa7..81d0944 100644 --- a/server/src/core/pool.go +++ b/server/src/core/pool.go @@ -19,11 +19,14 @@ func (c *Core) add(req *pb.Req, hr *http.Request) (pr *row, cached bool) { return } + c.serial++ pr = &row{ - hr: hr, - hash: hash, - req: req, - t: time.Now(), + serial: c.serial, + hr: hr, + hash: hash, + req: req, + t: time.Now(), + core: c, } pr.mux.Lock() go pr.run() @@ -31,3 +34,12 @@ func (c *Core) add(req *pb.Req, hr *http.Request) (pr *row, cached bool) { c.mux.Unlock() return } + +func (c *Core) delete(r *row) { + c.mux.Lock() + pr, ok := c.pool[r.hash] + if ok && pr == r { + delete(c.pool, r.hash) + } + c.mux.Unlock() +} diff --git a/server/src/core/row.go b/server/src/core/row.go index 62ded2a..2037c79 100644 --- a/server/src/core/row.go +++ b/server/src/core/row.go @@ -11,15 +11,22 @@ import ( ) type row struct { - hash [16]byte - hr *http.Request - req *pb.Req - rsp []byte - err error - done bool - t time.Time - mux sync.RWMutex - log *bytes.Buffer + serial int + hash [16]byte + hr *http.Request + req *pb.Req + rsp []byte + err error + done bool + t time.Time + mux sync.RWMutex + log *bytes.Buffer + core *Core +} + +func (pr *row) suicide() { + pr.core.delete(pr) + pr.core = nil } func (pr *row) run() { @@ -33,20 +40,20 @@ func (pr *row) run() { // return } - var ok bool - pr.rsp, ok, pr.err = pr.fetchRemote() - if pr.err == nil && ok { - // pr.failLog.Reset() - go writeFailLog(pr.hash, pr.log.Bytes()) - } else { - go writeFailLog(pr.hash, pr.log.Bytes()) - } - - go pr.saveFile() - // go pr.metrics() - + pr.rsp, pr.err = pr.fetchRemote() pr.done = true pr.mux.Unlock() + + // go pr.metrics() + + if pr.err == nil { + // pr.failLog.Reset() + // go writeFailLog(pr.hash, pr.log.Bytes()) + go pr.saveFile() + } else { + go writeFailLog(pr.hash, pr.log.Bytes()) + go pr.suicide() + } } func (pr *row) wait() {