This commit is contained in:
Zheng Kai
2023-04-27 16:02:58 +08:00
parent 92b017d0e1
commit 894b62bf29
10 changed files with 101 additions and 63 deletions

View File

@@ -4,6 +4,7 @@ package pb;
message Req { message Req {
string method = 1; string method = 1;
string path = 2; string url = 2;
bytes body = 3; string contentType = 3;
bytes body = 4;
} }

View File

@@ -8,43 +8,35 @@ import (
"net/url" "net/url"
"project/config" "project/config"
"project/util" "project/util"
"project/zj"
"time" "time"
) )
func (pr *row) fetchRemote() (ab []byte, ok bool, err error) { func (pr *row) fetchRemote() (ab []byte, ok bool, err error) {
r := pr.req r := pr.req
var b bytes.Buffer b := pr.log
pr.failLog = &b
u, _ := url.Parse(config.OpenAIBase) u, err := url.Parse(config.OpenAIBase + r.Url)
u.Path = r.Path if err != nil {
return nil, false, err
}
zj.J(`real url`, u.String())
req, err := http.NewRequest(r.Method, u.String(), bytes.NewReader(r.Body)) req, err := http.NewRequest(r.Method, u.String(), bytes.NewReader(r.Body))
if err != nil { if err != nil {
return return
} }
b.WriteString(pr.hr.URL.String()) req.Header.Set(`Content-Type`, r.ContentType)
b.WriteString("\n\nreq header:\n\n")
for k, v := range pr.hr.Header {
fmt.Fprintf(&b, "\t%s: %v\n", k, v)
}
b.WriteString("\n")
fmt.Fprintf(&b, "req body: %d\n\n", len(r.Body))
if len(r.Body) > 0 {
fmt.Fprintf(&b, "%s\n\n", r.Body)
}
req.Header.Set(`Content-Type`, `application/json`)
req.Header.Set(`Authorization`, `Bearer `+config.OpenAIKey) req.Header.Set(`Authorization`, `Bearer `+config.OpenAIKey)
client := &http.Client{ client := &http.Client{
Timeout: 30 * time.Second, // Timeout: 30 * time.Second,
} }
rsp, err := client.Do(req) rsp, err := client.Do(req)
if err != nil { if err != nil {
fmt.Fprintf(&b, "client.Do fail: %s\n", err.Error()) fmt.Fprintf(b, "client.Do fail: %s\n", err.Error())
return return
} }
@@ -55,15 +47,15 @@ func (pr *row) fetchRemote() (ab []byte, ok bool, err error) {
b.WriteString(err.Error()) b.WriteString(err.Error())
} }
b.WriteString("req header:\n\n") b.WriteString("rsp header:\n\n")
for k, v := range rsp.Header { for k, v := range rsp.Header {
fmt.Fprintf(&b, "\t%s: %v\n", k, v) fmt.Fprintf(b, "\t%s: %v\n", k, v)
} }
b.WriteString("\n") b.WriteString("\n")
ab, err = io.ReadAll(rsp.Body) ab, err = io.ReadAll(rsp.Body)
if err != nil { fmt.Fprintf(b, "rsp body: %d %v\n\n", len(ab), err)
fmt.Fprintf(&b, "rsp body: %d %v\n\n", len(ab), err) if err == nil {
b.Write(ab) b.Write(ab)
} }

View File

@@ -14,7 +14,11 @@ func doMetrics(ab []byte, cached bool, r *http.Request) {
metrics.RspBytes(len(ab)) metrics.RspBytes(len(ab))
o := &pb.Rsp{} o := &pb.Rsp{}
json.Unmarshal(ab, o) err := json.Unmarshal(ab, o)
if err != nil {
zj.J(`unmarshal fail`, err)
return
}
u := o.GetUsage() u := o.GetUsage()
if u == nil { if u == nil {

View File

@@ -13,7 +13,7 @@ func (c *Core) add(req *pb.Req, hr *http.Request) (pr *row, cached bool) {
c.mux.Lock() c.mux.Lock()
pr, ok := c.pool[hash] pr, ok := c.pool[hash]
if ok { if false && ok {
zj.F(`hit %x`, hash) zj.F(`hit %x`, hash)
c.mux.Unlock() c.mux.Unlock()
cached = true cached = true
@@ -27,7 +27,7 @@ func (c *Core) add(req *pb.Req, hr *http.Request) (pr *row, cached bool) {
t: time.Now(), t: time.Now(),
} }
pr.mux.Lock() pr.mux.Lock()
pr.run() go pr.run()
c.pool[hash] = pr c.pool[hash] = pr
c.mux.Unlock() c.mux.Unlock()
return return

View File

@@ -9,20 +9,30 @@ import (
) )
func (c *Core) getAB(p *pb.Req, r *http.Request) (ab []byte, cached bool, err error) { func (c *Core) getAB(p *pb.Req, r *http.Request) (ab []byte, cached bool, err error) {
ab, ok := tryCache(p)
if ok { canCache := p.Method != http.MethodGet && p.Method != http.MethodDelete
cached = true
return canCache = false
if canCache {
var ok bool
ab, ok = tryCache(p)
if ok {
cached = true
return
}
} }
pr, cached := c.add(p, r) pr, cached := c.add(p, r)
go func() { if canCache {
reqFile := util.CacheName(p.Hash()) + `-req.json` go func() {
if !util.FileExists(reqFile) { reqFile := util.CacheName(p.Hash()) + `-req.json`
util.WriteFile(reqFile, p.Body) if !util.FileExists(reqFile) {
} util.WriteFile(reqFile, p.Body)
}() }
}()
}
pr.wait() pr.wait()
@@ -33,27 +43,31 @@ func (c *Core) getAB(p *pb.Req, r *http.Request) (ab []byte, cached bool, err er
func req(w http.ResponseWriter, r *http.Request) (p *pb.Req, err error) { func req(w http.ResponseWriter, r *http.Request) (p *pb.Req, err error) {
path := r.URL.Path url := r.URL.String()
method := r.Method method := r.Method
contentType := r.Header.Get(`Content-Type`)
if contentType == `` {
contentType = `application/json`
}
if path == `/favicon.ico` { if url == `/favicon.ico` {
err = errSkip err = errSkip
return return
} }
ab, err := io.ReadAll(http.MaxBytesReader(w, r.Body, 1024*1024)) ab, err := io.ReadAll(http.MaxBytesReader(w, r.Body, 1024*1024*10))
go util.WriteFile(`last-req.json`, ab)
if err != nil { if err != nil {
return return
} }
go util.WriteFile(`last-req.json`, ab)
p = &pb.Req{ p = &pb.Req{
Path: path, Url: url,
Method: method, Method: method,
Body: ab, ContentType: contentType,
Body: ab,
} }
zj.F(`%x %s %s`, p.Hash(), method, path) zj.F(`%x %s %s %s`, p.Hash(), method, url, contentType)
return return
} }

View File

@@ -6,36 +6,40 @@ import (
"net/http" "net/http"
"project/pb" "project/pb"
"project/util" "project/util"
"project/zj"
"sync" "sync"
"time" "time"
) )
type row struct { type row struct {
hash [16]byte hash [16]byte
hr *http.Request hr *http.Request
req *pb.Req req *pb.Req
rsp []byte rsp []byte
err error err error
done bool done bool
t time.Time t time.Time
mux sync.RWMutex mux sync.RWMutex
failLog *bytes.Buffer log *bytes.Buffer
} }
func (pr *row) run() { func (pr *row) run() {
pr.t = time.Now() pr.t = time.Now()
s := fmt.Sprintf(`%x, %s`, pr.hash, pr.t.Format(`2006-01-02 15:04:05`)) pr.startLog()
zj.J(`new`, s)
if pr.req.Method != http.MethodGet {
// pr.mux.Unlock()
// return
}
var ok bool var ok bool
pr.rsp, ok, pr.err = pr.fetchRemote() pr.rsp, ok, pr.err = pr.fetchRemote()
if pr.err == nil && ok { if pr.err == nil && ok {
pr.failLog.Reset() // pr.failLog.Reset()
go writeFailLog(pr.hash, pr.log.Bytes())
} else { } else {
go writeFailLog(pr.hash, pr.failLog.Bytes()) go writeFailLog(pr.hash, pr.log.Bytes())
} }
go pr.saveFile() go pr.saveFile()
@@ -56,3 +60,24 @@ func (pr *row) saveFile() {
rspFile := rspCacheFile(pr.req) rspFile := rspCacheFile(pr.req)
util.WriteFile(rspFile, pr.rsp) util.WriteFile(rspFile, pr.rsp)
} }
func (pr *row) startLog() {
var b bytes.Buffer
pr.log = &b
b.WriteString(time.Now().Format("2006-01-02 15:04:05.000\n"))
b.WriteString(pr.hr.Method + ` ` + pr.hr.URL.String())
b.WriteString("\n\nreq header:\n\n")
for k, v := range pr.hr.Header {
fmt.Fprintf(&b, "\t%s: %v\n", k, v)
}
b.WriteString("\n")
body := pr.req.Body
fmt.Fprintf(&b, "req body: %d\n\n", len(body))
if len(body) > 0 {
b.Write(body)
b.WriteString("\n\n")
}
}

View File

@@ -10,7 +10,7 @@ import (
var errSkip = errors.New(`skip`) var errSkip = errors.New(`skip`)
// WebHandle ... // WebHandle ...
func (c *Core) WebHandle(w http.ResponseWriter, r *http.Request) { func (c *Core) ServeHTTP(w http.ResponseWriter, r *http.Request) {
p, err := req(w, r) p, err := req(w, r)
if err != nil { if err != nil {

View File

@@ -12,7 +12,7 @@ func (x *Req) Hash() [16]byte {
io.WriteString(m, x.Method) io.WriteString(m, x.Method)
m.Write([]byte{0x00}) m.Write([]byte{0x00})
io.WriteString(m, x.Method) io.WriteString(m, x.Url)
m.Write([]byte{0x00}) m.Write([]byte{0x00})
m.Write(x.Body) m.Write(x.Body)

View File

@@ -16,6 +16,8 @@ func Start() {
zj.Init() zj.Init()
// zj.J(`key`, config.OpenAIKey)
// tmptest.Test() // tmptest.Test()
go web.Server() go web.Server()

View File

@@ -16,7 +16,7 @@ func Server() {
mux := http.NewServeMux() mux := http.NewServeMux()
mux.Handle(`/_metrics`, promhttp.Handler()) mux.Handle(`/_metrics`, promhttp.Handler())
mux.HandleFunc(`/`, core.NewCore().WebHandle) mux.Handle(`/`, core.NewCore())
s := &http.Server{ s := &http.Server{
Addr: config.WebAddr, Addr: config.WebAddr,