From f0c0db8e945f5bb4accf62fe1f6f7f42ba5bb210 Mon Sep 17 00:00:00 2001 From: Zheng Kai Date: Mon, 12 Jun 2023 18:25:43 +0800 Subject: [PATCH] es index --- server/src/es/index.go | 51 ++++++++++++++++++++++++++++++++++ server/src/es/init.go | 2 ++ server/src/es/insert.go | 4 ++- server/src/es/tpl/mapping.json | 43 ++++++++++++++++++++++++++++ 4 files changed, 99 insertions(+), 1 deletion(-) create mode 100644 server/src/es/index.go create mode 100644 server/src/es/tpl/mapping.json diff --git a/server/src/es/index.go b/server/src/es/index.go new file mode 100644 index 0000000..5c2487b --- /dev/null +++ b/server/src/es/index.go @@ -0,0 +1,51 @@ +package es + +import ( + "fmt" + "project/config" + "project/zj" + "strings" + "time" + + _ "embed" // + + "github.com/zhengkai/zu" +) + +//go:embed tpl/mapping.json +var indexMapping string + +func indexName(ts uint32) string { + + index := `orca-metrics` + if !config.Prod { + index = `dev-` + index + } + + index = fmt.Sprintf(`%s-%s`, index, time.Unix(int64(ts), 0).Format(`2006-01-02`)) + + return index +} + +func createIndex() { + + ts := zu.TS() + + zj.J(`index name:`, indexName(ts)) + + mapping(ts) + mapping(ts + 86400) + go func() { + for { + time.Sleep(time.Hour * 3) + mapping(zu.TS() + 86400) + } + }() +} + +func mapping(ts uint32) { + theClient.Indices.Create( + indexName(ts), + theClient.Indices.Create.WithBody(strings.NewReader(indexMapping)), + ) +} diff --git a/server/src/es/init.go b/server/src/es/init.go index 96a9576..acd2ecf 100644 --- a/server/src/es/init.go +++ b/server/src/es/init.go @@ -28,5 +28,7 @@ func Init() (err error) { zj.J(`elasticsearch`, res.String()) + createIndex() + return } diff --git a/server/src/es/insert.go b/server/src/es/insert.go index 6a9a8bd..f63b4ae 100644 --- a/server/src/es/insert.go +++ b/server/src/es/insert.go @@ -22,7 +22,9 @@ func Insert(d *pb.EsMetrics) { // theClient.Create(`orca-metrics`, d.ID, bytes.NewReader(ab)) - re, err := theClient.Index(`orca-metrics`, bytes.NewReader(ab)) + index := indexName(uint32(d.Ts / 1000)) + + re, err := theClient.Index(index, bytes.NewReader(ab)) if err != nil { return } diff --git a/server/src/es/tpl/mapping.json b/server/src/es/tpl/mapping.json new file mode 100644 index 0000000..e278209 --- /dev/null +++ b/server/src/es/tpl/mapping.json @@ -0,0 +1,43 @@ +{ + "mappings": { + "properties": { + "ID": { + "type": "keyword" + }, + "token": { + "properties": { + "total": { + "type": "long" + }, + "completion": { + "type": "long" + }, + "prompt": { + "type": "long" + } + } + }, + "cached": { + "type": "boolean" + }, + "ip": { + "type": "ip" + }, + "model": { + "type": "keyword" + }, + "key": { + "type": "keyword" + }, + "reqBytes": { + "type": "long" + }, + "rspBytes": { + "type": "long" + }, + "ts": { + "type": "date" + } + } + } +}