diff --git a/proto/es.proto b/proto/es.proto new file mode 100644 index 0000000..bc9a659 --- /dev/null +++ b/proto/es.proto @@ -0,0 +1,21 @@ +syntax = "proto3"; +option go_package = "/pb"; +package pb; + +message EsMetrics { + string ID = 1; + EsMetricsToken token = 2; + bool cached = 3; + string ip = 4; + string model = 5; + string key = 6; + uint32 reqBytes = 7; + uint32 rspBytes = 8; + uint64 ts = 9; +} + +message EsMetricsToken { + uint32 total = 1; + uint32 completion = 2; + uint32 prompt = 3; +} diff --git a/server/src/config/config.go b/server/src/config/config.go index 3a2ff67..93e119d 100644 --- a/server/src/config/config.go +++ b/server/src/config/config.go @@ -13,4 +13,7 @@ var ( OpenAIBase = `https://api.openai.com` OpenAIKey = `` + + ESUser = `` + ESPass = `` ) diff --git a/server/src/config/init.go b/server/src/config/init.go index 94e6ec7..cf86ec0 100644 --- a/server/src/config/init.go +++ b/server/src/config/init.go @@ -17,6 +17,8 @@ func init() { `STATIC_DIR`: &StaticDir, `ORCA_WEB`: &WebAddr, `ORCA_LOG`: &LogDir, + `ORCA_ES_USER`: &ESUser, + `ORCA_ES_PASS`: &ESPass, } for k, v := range list { s := os.Getenv(k) diff --git a/server/src/core/metrics.go b/server/src/core/metrics.go index 50f3330..6c6b915 100644 --- a/server/src/core/metrics.go +++ b/server/src/core/metrics.go @@ -3,16 +3,20 @@ package core import ( "encoding/json" "net/http" + "project/es" "project/metrics" "project/pb" "project/util" "project/zj" "strings" + + "github.com/zhengkai/zu" ) -func doMetrics(ab []byte, cached bool, r *http.Request) { +func doMetrics(ab []byte, cached bool, r *http.Request, reqBytes int) { - metrics.RspBytes(len(ab)) + rspBytes := len(ab) + metrics.RspBytes(rspBytes) o := &pb.Rsp{} err := json.Unmarshal(ab, o) @@ -35,7 +39,8 @@ func doMetrics(ab []byte, cached bool, r *http.Request) { metrics.RspTokenByModel(o.Model, u.TotalTokens) - metrics.RspTokenByKey(strings.TrimPrefix(r.Header.Get(`Authorization`), `Bearer `), u.TotalTokens) + key := strings.TrimPrefix(r.Header.Get(`Authorization`), `Bearer `) + metrics.RspTokenByKey(key, u.TotalTokens) ip, err := util.GetIP(r) sip := ip.String() @@ -43,4 +48,21 @@ func doMetrics(ab []byte, cached bool, r *http.Request) { sip = `unknown` } metrics.RspTokenByIP(sip, u.TotalTokens) + + d := &pb.EsMetrics{ + ID: o.Id, + Token: &pb.EsMetricsToken{ + Total: u.TotalTokens, + Completion: u.CompletionTokens, + Prompt: u.PromptTokens, + }, + Cached: cached, + Ip: sip, + Model: o.Model, + Key: key, + ReqBytes: uint32(reqBytes), + RspBytes: uint32(rspBytes), + Ts: zu.MS(), + } + go es.Insert(d) } diff --git a/server/src/core/web.go b/server/src/core/web.go index 18fb369..23a7943 100644 --- a/server/src/core/web.go +++ b/server/src/core/web.go @@ -36,5 +36,5 @@ func (c *Core) ServeHTTP(w http.ResponseWriter, r *http.Request) { w.Header().Add(`Content-Type`, `application/json`) w.Write(ab) - go doMetrics(ab, cached, r) + go doMetrics(ab, cached, r, len(p.Body)) } diff --git a/server/src/es/init.go b/server/src/es/init.go new file mode 100644 index 0000000..96a9576 --- /dev/null +++ b/server/src/es/init.go @@ -0,0 +1,32 @@ +package es + +import ( + "project/config" + "project/zj" + + "github.com/elastic/go-elasticsearch/v8" +) + +var theClient *elasticsearch.Client + +// Init ... +func Init() (err error) { + + theClient, err = elasticsearch.NewClient(elasticsearch.Config{ + Username: config.ESUser, + Password: config.ESPass, + }) + if err != nil { + return err + } + + res, err := theClient.Info() + if err != nil { + return err + } + defer res.Body.Close() + + zj.J(`elasticsearch`, res.String()) + + return +} diff --git a/server/src/es/insert.go b/server/src/es/insert.go new file mode 100644 index 0000000..6a9a8bd --- /dev/null +++ b/server/src/es/insert.go @@ -0,0 +1,31 @@ +package es + +import ( + "bytes" + "encoding/json" + "project/pb" +) + +// Insert ... +func Insert(d *pb.EsMetrics) { + + if theClient == nil { + return + } + + ab, err := json.Marshal(d) + if err != nil { + return + } + + // zj.J(string(ab)) + + // theClient.Create(`orca-metrics`, d.ID, bytes.NewReader(ab)) + + re, err := theClient.Index(`orca-metrics`, bytes.NewReader(ab)) + if err != nil { + return + } + defer re.Body.Close() + // zj.J(re.String()) +} diff --git a/server/src/es/test.go b/server/src/es/test.go new file mode 100644 index 0000000..7f2f320 --- /dev/null +++ b/server/src/es/test.go @@ -0,0 +1,49 @@ +package es + +import ( + "fmt" + "math/rand" + "project/pb" + "time" + + "github.com/zhengkai/zu" +) + +// Test ... +func Test() { + + for { + + minute := time.Now().Minute() - 30 + if minute < 0 { + minute = -minute + } + minute += 3 + + modelChoose := []string{ + `gpt-3.5-turbo-0301`, + `gpt-4-0314`, + `text-embedding-ada-002-v2`, + } + model := modelChoose[rand.Intn(len(modelChoose))] + + d := &pb.EsMetrics{ + ID: fmt.Sprintf(`chatcmpl-%d`, zu.MS()), + Token: &pb.EsMetricsToken{ + Total: uint32(rand.Intn(minute * 10)), + Completion: uint32(rand.Intn(minute * 10)), + Prompt: uint32(rand.Intn(minute * 10)), + }, + Cached: true, + Ip: `127.0.0.1`, + Model: model, + Key: `zhengkai.orca`, + ReqBytes: uint32(rand.Intn(minute * 57)), + RspBytes: uint32(rand.Intn(minute * 21)), + Ts: zu.MS(), + } + Insert(d) + + time.Sleep(time.Second / 2) + } +} diff --git a/server/src/go.mod b/server/src/go.mod index 57e9614..b733528 100644 --- a/server/src/go.mod +++ b/server/src/go.mod @@ -3,20 +3,22 @@ module project go 1.20 require ( + github.com/elastic/go-elasticsearch/v8 v8.8.1 github.com/prometheus/client_golang v1.14.0 github.com/zhengkai/life-go v1.0.3 github.com/zhengkai/zog v1.0.3 + github.com/zhengkai/zu v1.0.15 google.golang.org/protobuf v1.30.0 ) require ( github.com/beorn7/perks v1.0.1 // indirect github.com/cespare/xxhash/v2 v2.1.2 // indirect + github.com/elastic/elastic-transport-go/v8 v8.3.0 // indirect github.com/golang/protobuf v1.5.2 // indirect github.com/matttproud/golang_protobuf_extensions v1.0.1 // indirect github.com/prometheus/client_model v0.3.0 // indirect github.com/prometheus/common v0.37.0 // indirect github.com/prometheus/procfs v0.8.0 // indirect - github.com/zhengkai/zu v1.0.15 // indirect golang.org/x/sys v0.0.0-20220520151302-bc2c85ada10a // indirect ) diff --git a/server/src/go.sum b/server/src/go.sum index 78f58e5..5a0e2df 100644 --- a/server/src/go.sum +++ b/server/src/go.sum @@ -52,7 +52,12 @@ github.com/chzyer/test v0.0.0-20180213035817-a1ea475d72b1/go.mod h1:Q3SI9o4m/ZMn github.com/client9/misspell v0.3.4/go.mod h1:qj6jICC3Q7zFZvVWo7KLAzC3yx5G7kyvSDkc90ppPyw= github.com/cncf/udpa/go v0.0.0-20191209042840-269d4d468f6f/go.mod h1:M8M6+tZqaGXZJjfX53e64911xZQV5JYwmTeXPW+k8Sc= github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/elastic/elastic-transport-go/v8 v8.3.0 h1:DJGxovyQLXGr62e9nDMPSxRyWION0Bh6d9eCFBriiHo= +github.com/elastic/elastic-transport-go/v8 v8.3.0/go.mod h1:87Tcz8IVNe6rVSLdBux1o/PEItLtyabHU3naC7IoqKI= +github.com/elastic/go-elasticsearch/v8 v8.8.1 h1:/OiP5Yex40q5eWpzFVQIS8jRE7SaEZrFkG9JbE6TXtY= +github.com/elastic/go-elasticsearch/v8 v8.8.1/go.mod h1:GU1BJHO7WeamP7UhuElYwzzHtvf9SDmeVpSSy9+o6Qg= github.com/envoyproxy/go-control-plane v0.9.0/go.mod h1:YTl/9mNaCwkRvm6d1a2C3ymFceY/DCBVvsKhRF0iEA4= github.com/envoyproxy/go-control-plane v0.9.1-0.20191026205805-5f8ba28d4473/go.mod h1:YTl/9mNaCwkRvm6d1a2C3ymFceY/DCBVvsKhRF0iEA4= github.com/envoyproxy/go-control-plane v0.9.4/go.mod h1:6rpuAdCZL397s3pYoYcLgu1mIlRU8Am5FuJP05cCM98= diff --git a/server/src/project.go b/server/src/project.go index 85f50e3..8124755 100644 --- a/server/src/project.go +++ b/server/src/project.go @@ -3,6 +3,7 @@ package project import ( "project/build" "project/config" + "project/es" "project/web" "project/zj" @@ -16,9 +17,11 @@ func Start() { zj.Init() - // zj.J(`key`, config.OpenAIKey) + es.Init() - // tmptest.Test() + if !config.Prod { + go es.Test() + } go web.Server() diff --git a/server/src/util/json.go b/server/src/util/json.go new file mode 100644 index 0000000..b9be895 --- /dev/null +++ b/server/src/util/json.go @@ -0,0 +1,21 @@ +package util + +import ( + "google.golang.org/protobuf/encoding/protojson" + "google.golang.org/protobuf/proto" +) + +// JSONMarshal ... +func JSONMarshal(m proto.Message) ([]byte, error) { + return protojson.MarshalOptions{ + AllowPartial: true, + }.Marshal(m) +} + +// JSONUnmarshal ... +func JSONUnmarshal(ab []byte, m proto.Message) error { + return protojson.UnmarshalOptions{ + AllowPartial: true, + DiscardUnknown: true, + }.Unmarshal(ab, m) +}