From c06dbde3da4304bfb5e3a55944b7a24d42559cbb Mon Sep 17 00:00:00 2001
From: Sakurasan <26715255+Sakurasan@users.noreply.github.com>
Date: Tue, 8 Oct 2024 02:49:08 +0800
Subject: [PATCH] update realtime
---
README.md | 4 ++
pkg/openai/realtime.go | 107 ++++++++++++++++++++++++++++++++++++-----
store/cache.go | 8 +++
3 files changed, 108 insertions(+), 11 deletions(-)
diff --git a/README.md b/README.md
index 6acde4b..482f18c 100644
--- a/README.md
+++ b/README.md
@@ -6,6 +6,8 @@
+[](https://t.me/OpenTeamChat) [](https://t.me/OpenTeamLLM)
+
opencatd-open is an open-source, team-shared service for ChatGPT API that can be safely shared with others for API usage.
---
@@ -91,6 +93,8 @@ pandora for team
设置主页跳转地址?
- 修改环境变量 `CUSTOM_REDIRECT=https://your.domain`
+## 获取更多信息
+[](https://t.me/OpenTeamLLM)
## 赞助
[](https://www.buymeacoffee.com/littlecjun)
diff --git a/pkg/openai/realtime.go b/pkg/openai/realtime.go
index 5c7627e..36dc43d 100644
--- a/pkg/openai/realtime.go
+++ b/pkg/openai/realtime.go
@@ -1,5 +1,6 @@
/*
https://platform.openai.com/docs/guides/realtime
+https://learn.microsoft.com/zh-cn/azure/ai-services/openai/how-to/audio-real-time
wss://my-eastus2-openai-resource.openai.azure.com/openai/realtime?api-version=2024-10-01-preview&deployment=gpt-4o-realtime-preview-1001
*/
@@ -7,11 +8,14 @@ package openai
import (
"context"
+ "encoding/json"
+ "fmt"
"log"
"net/http"
"net/url"
+ "opencatd-open/pkg/tokenizer"
+ "opencatd-open/store"
"os"
- "time"
"github.com/gin-gonic/gin"
"github.com/gorilla/websocket"
@@ -20,6 +24,7 @@ import (
// "wss://api.openai.com/v1/realtime?model=gpt-4o-realtime-preview-2024-10-01"
const realtimeURL = "wss://api.openai.com/v1/realtime"
+const azureRealtimeURL = "wss://%s.openai.azure.com/openai/realtime?api-version=2024-10-01-preview&deployment=gpt-4o-realtime-preview"
var upgrader = websocket.Upgrader{
CheckOrigin: func(r *http.Request) bool {
@@ -37,6 +42,44 @@ type Response struct {
Instructions string `json:"instructions"`
}
+type RealTimeResponse struct {
+ Type string `json:"type"`
+ EventID string `json:"event_id"`
+ Response struct {
+ Object string `json:"object"`
+ ID string `json:"id"`
+ Status string `json:"status"`
+ StatusDetails any `json:"status_details"`
+ Output []struct {
+ ID string `json:"id"`
+ Object string `json:"object"`
+ Type string `json:"type"`
+ Status string `json:"status"`
+ Role string `json:"role"`
+ Content []struct {
+ Type string `json:"type"`
+ Transcript string `json:"transcript"`
+ } `json:"content"`
+ } `json:"output"`
+ Usage Usage `json:"usage"`
+ } `json:"response"`
+}
+
+type Usage struct {
+ TotalTokens int `json:"total_tokens"`
+ InputTokens int `json:"input_tokens"`
+ OutputTokens int `json:"output_tokens"`
+ InputTokenDetails struct {
+ CachedTokens int `json:"cached_tokens"`
+ TextTokens int `json:"text_tokens"`
+ AudioTokens int `json:"audio_tokens"`
+ } `json:"input_token_details"`
+ OutputTokenDetails struct {
+ TextTokens int `json:"text_tokens"`
+ AudioTokens int `json:"audio_tokens"`
+ } `json:"output_token_details"`
+}
+
func RealTimeProxy(c *gin.Context) {
log.Println(c.Request.URL.String())
var model string = c.Query("model")
@@ -52,16 +95,31 @@ func RealTimeProxy(c *gin.Context) {
}
defer clientConn.Close()
+ apikey, err := store.SelectKeyCacheByModel(model)
+ if err != nil {
+ c.JSON(http.StatusInternalServerError, gin.H{"error": err.Error()})
+ return
+ }
// 连接到 OpenAI WebSocket
- headers := http.Header{
- "Authorization": []string{"Bearer " + os.Getenv("OPENAI_API_KEY")},
- "OpenAI-Beta": []string{"realtime=v1"},
+ headers := http.Header{"OpenAI-Beta": []string{"realtime=v1"}}
+
+ if apikey.ApiType == "azure" {
+ headers.Set("api-key", apikey.Key)
+ if apikey.EndPoint != "" {
+ realtimeURL = fmt.Sprintf("%s/openai/realtime?api-version=2024-10-01-preview&deployment=gpt-4o-realtime-preview", apikey.EndPoint)
+ } else {
+ realtimeURL = fmt.Sprintf(azureRealtimeURL, apikey.ResourceNmae)
+ }
+ } else {
+ headers.Set("Authorization", "Bearer "+apikey.Key)
}
- conn := websocket.Dialer{
- // Proxy: http.ProxyURL(&url.URL{Scheme: "http", Host: "127.0.0.1:7890"}),
- HandshakeTimeout: 45 * time.Second,
+ conn := websocket.DefaultDialer
+ if os.Getenv("LOCAL_PROXY") != "" {
+ proxyUrl, _ := url.Parse(os.Getenv("LOCAL_PROXY"))
+ conn.Proxy = http.ProxyURL(proxyUrl)
}
+
openAIConn, _, err := conn.Dial(realtimeURL, headers)
if err != nil {
log.Println("OpenAI dial error:", err)
@@ -75,11 +133,11 @@ func RealTimeProxy(c *gin.Context) {
g, ctx := errgroup.WithContext(ctx)
g.Go(func() error {
- return forwardMessages(ctx, clientConn, openAIConn)
+ return forwardMessages(ctx, c, clientConn, openAIConn)
})
g.Go(func() error {
- return forwardMessages(ctx, openAIConn, clientConn)
+ return forwardMessages(ctx, c, openAIConn, clientConn)
})
if err := g.Wait(); err != nil {
@@ -89,7 +147,16 @@ func RealTimeProxy(c *gin.Context) {
}
-func forwardMessages(ctx context.Context, src, dst *websocket.Conn) error {
+func forwardMessages(ctx context.Context, c *gin.Context, src, dst *websocket.Conn) error {
+ usagelog := store.Tokens{Model: "gpt-4o-realtime-preview"}
+
+ token, _ := c.Get("localuser")
+
+ lu, err := store.GetUserByToken(token.(string))
+ if err != nil {
+ return err
+ }
+ usagelog.UserID = int(lu.ID)
for {
select {
case <-ctx.Done():
@@ -102,11 +169,29 @@ func forwardMessages(ctx context.Context, src, dst *websocket.Conn) error {
}
return err
}
- log.Println("Received message:", string(message))
+ if messageType == websocket.TextMessage {
+ var usage Usage
+ err := json.Unmarshal(message, &usage)
+ if err == nil {
+ usagelog.PromptCount += usage.InputTokens
+ usagelog.CompletionCount += usage.OutputTokens
+ }
+
+ }
err = dst.WriteMessage(messageType, message)
if err != nil {
return err
}
}
}
+ defer func() {
+ usagelog.Cost = fmt.Sprintf("%.6f", tokenizer.Cost(usagelog.Model, usagelog.PromptCount, usagelog.CompletionCount))
+ if err := store.Record(&usagelog); err != nil {
+ log.Println(err)
+ }
+ if err := store.SumDaily(usagelog.UserID); err != nil {
+ log.Println(err)
+ }
+ }()
+ return nil
}
diff --git a/store/cache.go b/store/cache.go
index b429fb8..23b3fa8 100644
--- a/store/cache.go
+++ b/store/cache.go
@@ -78,6 +78,14 @@ func SelectKeyCacheByModel(model string) (Key, error) {
var keys []Key
items := KeysCache.Items()
for _, item := range items {
+ if strings.Contains(model, "realtime") {
+ if item.Object.(Key).ApiType == "openai" {
+ keys = append(keys, item.Object.(Key))
+ }
+ if item.Object.(Key).ApiType == "azure" {
+ keys = append(keys, item.Object.(Key))
+ }
+ }
if strings.HasPrefix(model, "gpt-") {
if item.Object.(Key).ApiType == "openai" {
keys = append(keys, item.Object.(Key))