From 894b62bf2980e3b138c6cc68af951f70778b84c4 Mon Sep 17 00:00:00 2001 From: Zheng Kai Date: Thu, 27 Apr 2023 16:02:58 +0800 Subject: [PATCH] up --- proto/req.proto | 5 ++-- server/src/core/fetch.go | 36 ++++++++++---------------- server/src/core/metrics.go | 6 ++++- server/src/core/pool.go | 4 +-- server/src/core/req.go | 52 +++++++++++++++++++++++-------------- server/src/core/row.go | 53 ++++++++++++++++++++++++++++---------- server/src/core/web.go | 2 +- server/src/pb/req.go | 2 +- server/src/project.go | 2 ++ server/src/web/server.go | 2 +- 10 files changed, 101 insertions(+), 63 deletions(-) diff --git a/proto/req.proto b/proto/req.proto index 09b2940..910bc59 100644 --- a/proto/req.proto +++ b/proto/req.proto @@ -4,6 +4,7 @@ package pb; message Req { string method = 1; - string path = 2; - bytes body = 3; + string url = 2; + string contentType = 3; + bytes body = 4; } diff --git a/server/src/core/fetch.go b/server/src/core/fetch.go index d8e0031..5335738 100644 --- a/server/src/core/fetch.go +++ b/server/src/core/fetch.go @@ -8,43 +8,35 @@ import ( "net/url" "project/config" "project/util" + "project/zj" "time" ) func (pr *row) fetchRemote() (ab []byte, ok bool, err error) { r := pr.req - var b bytes.Buffer - pr.failLog = &b + b := pr.log - u, _ := url.Parse(config.OpenAIBase) - u.Path = r.Path + u, err := url.Parse(config.OpenAIBase + r.Url) + if err != nil { + return nil, false, err + } + zj.J(`real url`, u.String()) req, err := http.NewRequest(r.Method, u.String(), bytes.NewReader(r.Body)) if err != nil { return } - b.WriteString(pr.hr.URL.String()) - b.WriteString("\n\nreq header:\n\n") - for k, v := range pr.hr.Header { - fmt.Fprintf(&b, "\t%s: %v\n", k, v) - } - b.WriteString("\n") - fmt.Fprintf(&b, "req body: %d\n\n", len(r.Body)) - if len(r.Body) > 0 { - fmt.Fprintf(&b, "%s\n\n", r.Body) - } - - req.Header.Set(`Content-Type`, `application/json`) + req.Header.Set(`Content-Type`, r.ContentType) req.Header.Set(`Authorization`, `Bearer `+config.OpenAIKey) client := &http.Client{ - Timeout: 30 * time.Second, + // Timeout: 30 * time.Second, } rsp, err := client.Do(req) if err != nil { - fmt.Fprintf(&b, "client.Do fail: %s\n", err.Error()) + fmt.Fprintf(b, "client.Do fail: %s\n", err.Error()) return } @@ -55,15 +47,15 @@ func (pr *row) fetchRemote() (ab []byte, ok bool, err error) { b.WriteString(err.Error()) } - b.WriteString("req header:\n\n") + b.WriteString("rsp header:\n\n") for k, v := range rsp.Header { - fmt.Fprintf(&b, "\t%s: %v\n", k, v) + fmt.Fprintf(b, "\t%s: %v\n", k, v) } b.WriteString("\n") ab, err = io.ReadAll(rsp.Body) - if err != nil { - fmt.Fprintf(&b, "rsp body: %d %v\n\n", len(ab), err) + fmt.Fprintf(b, "rsp body: %d %v\n\n", len(ab), err) + if err == nil { b.Write(ab) } diff --git a/server/src/core/metrics.go b/server/src/core/metrics.go index b9cb39a..d739d5e 100644 --- a/server/src/core/metrics.go +++ b/server/src/core/metrics.go @@ -14,7 +14,11 @@ func doMetrics(ab []byte, cached bool, r *http.Request) { metrics.RspBytes(len(ab)) o := &pb.Rsp{} - json.Unmarshal(ab, o) + err := json.Unmarshal(ab, o) + if err != nil { + zj.J(`unmarshal fail`, err) + return + } u := o.GetUsage() if u == nil { diff --git a/server/src/core/pool.go b/server/src/core/pool.go index 584a3c9..919ec29 100644 --- a/server/src/core/pool.go +++ b/server/src/core/pool.go @@ -13,7 +13,7 @@ func (c *Core) add(req *pb.Req, hr *http.Request) (pr *row, cached bool) { c.mux.Lock() pr, ok := c.pool[hash] - if ok { + if false && ok { zj.F(`hit %x`, hash) c.mux.Unlock() cached = true @@ -27,7 +27,7 @@ func (c *Core) add(req *pb.Req, hr *http.Request) (pr *row, cached bool) { t: time.Now(), } pr.mux.Lock() - pr.run() + go pr.run() c.pool[hash] = pr c.mux.Unlock() return diff --git a/server/src/core/req.go b/server/src/core/req.go index f1d3ff1..5784865 100644 --- a/server/src/core/req.go +++ b/server/src/core/req.go @@ -9,20 +9,30 @@ import ( ) func (c *Core) getAB(p *pb.Req, r *http.Request) (ab []byte, cached bool, err error) { - ab, ok := tryCache(p) - if ok { - cached = true - return + + canCache := p.Method != http.MethodGet && p.Method != http.MethodDelete + + canCache = false + + if canCache { + var ok bool + ab, ok = tryCache(p) + if ok { + cached = true + return + } } pr, cached := c.add(p, r) - go func() { - reqFile := util.CacheName(p.Hash()) + `-req.json` - if !util.FileExists(reqFile) { - util.WriteFile(reqFile, p.Body) - } - }() + if canCache { + go func() { + reqFile := util.CacheName(p.Hash()) + `-req.json` + if !util.FileExists(reqFile) { + util.WriteFile(reqFile, p.Body) + } + }() + } pr.wait() @@ -33,27 +43,31 @@ func (c *Core) getAB(p *pb.Req, r *http.Request) (ab []byte, cached bool, err er func req(w http.ResponseWriter, r *http.Request) (p *pb.Req, err error) { - path := r.URL.Path + url := r.URL.String() method := r.Method + contentType := r.Header.Get(`Content-Type`) + if contentType == `` { + contentType = `application/json` + } - if path == `/favicon.ico` { + if url == `/favicon.ico` { err = errSkip return } - ab, err := io.ReadAll(http.MaxBytesReader(w, r.Body, 1024*1024)) + ab, err := io.ReadAll(http.MaxBytesReader(w, r.Body, 1024*1024*10)) + go util.WriteFile(`last-req.json`, ab) if err != nil { return } - go util.WriteFile(`last-req.json`, ab) - p = &pb.Req{ - Path: path, - Method: method, - Body: ab, + Url: url, + Method: method, + ContentType: contentType, + Body: ab, } - zj.F(`%x %s %s`, p.Hash(), method, path) + zj.F(`%x %s %s %s`, p.Hash(), method, url, contentType) return } diff --git a/server/src/core/row.go b/server/src/core/row.go index a804313..62ded2a 100644 --- a/server/src/core/row.go +++ b/server/src/core/row.go @@ -6,36 +6,40 @@ import ( "net/http" "project/pb" "project/util" - "project/zj" "sync" "time" ) type row struct { - hash [16]byte - hr *http.Request - req *pb.Req - rsp []byte - err error - done bool - t time.Time - mux sync.RWMutex - failLog *bytes.Buffer + hash [16]byte + hr *http.Request + req *pb.Req + rsp []byte + err error + done bool + t time.Time + mux sync.RWMutex + log *bytes.Buffer } func (pr *row) run() { pr.t = time.Now() - s := fmt.Sprintf(`%x, %s`, pr.hash, pr.t.Format(`2006-01-02 15:04:05`)) - zj.J(`new`, s) + pr.startLog() + + if pr.req.Method != http.MethodGet { + // pr.mux.Unlock() + // return + } var ok bool pr.rsp, ok, pr.err = pr.fetchRemote() if pr.err == nil && ok { - pr.failLog.Reset() + // pr.failLog.Reset() + go writeFailLog(pr.hash, pr.log.Bytes()) } else { - go writeFailLog(pr.hash, pr.failLog.Bytes()) + go writeFailLog(pr.hash, pr.log.Bytes()) } go pr.saveFile() @@ -56,3 +60,24 @@ func (pr *row) saveFile() { rspFile := rspCacheFile(pr.req) util.WriteFile(rspFile, pr.rsp) } + +func (pr *row) startLog() { + + var b bytes.Buffer + pr.log = &b + + b.WriteString(time.Now().Format("2006-01-02 15:04:05.000\n")) + b.WriteString(pr.hr.Method + ` ` + pr.hr.URL.String()) + b.WriteString("\n\nreq header:\n\n") + for k, v := range pr.hr.Header { + fmt.Fprintf(&b, "\t%s: %v\n", k, v) + } + b.WriteString("\n") + + body := pr.req.Body + fmt.Fprintf(&b, "req body: %d\n\n", len(body)) + if len(body) > 0 { + b.Write(body) + b.WriteString("\n\n") + } +} diff --git a/server/src/core/web.go b/server/src/core/web.go index 1d708d7..92f1948 100644 --- a/server/src/core/web.go +++ b/server/src/core/web.go @@ -10,7 +10,7 @@ import ( var errSkip = errors.New(`skip`) // WebHandle ... -func (c *Core) WebHandle(w http.ResponseWriter, r *http.Request) { +func (c *Core) ServeHTTP(w http.ResponseWriter, r *http.Request) { p, err := req(w, r) if err != nil { diff --git a/server/src/pb/req.go b/server/src/pb/req.go index 86df348..333fcde 100644 --- a/server/src/pb/req.go +++ b/server/src/pb/req.go @@ -12,7 +12,7 @@ func (x *Req) Hash() [16]byte { io.WriteString(m, x.Method) m.Write([]byte{0x00}) - io.WriteString(m, x.Method) + io.WriteString(m, x.Url) m.Write([]byte{0x00}) m.Write(x.Body) diff --git a/server/src/project.go b/server/src/project.go index 7418057..85f50e3 100644 --- a/server/src/project.go +++ b/server/src/project.go @@ -16,6 +16,8 @@ func Start() { zj.Init() + // zj.J(`key`, config.OpenAIKey) + // tmptest.Test() go web.Server() diff --git a/server/src/web/server.go b/server/src/web/server.go index e4ef2b5..a43a63b 100644 --- a/server/src/web/server.go +++ b/server/src/web/server.go @@ -16,7 +16,7 @@ func Server() { mux := http.NewServeMux() mux.Handle(`/_metrics`, promhttp.Handler()) - mux.HandleFunc(`/`, core.NewCore().WebHandle) + mux.Handle(`/`, core.NewCore()) s := &http.Server{ Addr: config.WebAddr,