基本完成 1. Client 集中管理;2. 长对话保持。

This commit is contained in:
Havoc412 2024-11-19 02:22:39 +08:00
parent af043befc2
commit 57f7e3f872
16 changed files with 332 additions and 42 deletions

View File

@ -4,15 +4,22 @@ const (
ErrNoContent = ErrNlp + iota
ErrNoDocFound
ErrPythonServierDown
ErrGlmBusy
ErrGlmHistoryLoss
ErrGlmNewClientFail
)
func NlpMsgInit(m msg) {
m[ErrNoContent] = "内容为空"
m[ErrNoDocFound] = "没有找到相关文档"
m[ErrGlmNewClientFail] = "GLM 新建客户端失败"
}
func NlpMsgUserInit(m msg) {
m[ErrNoContent] = "请输入内容"
m[ErrNoDocFound] = "小护没有在知识库中找到相关文档。😿"
m[ErrPythonServierDown] = "小护的🐍python服务挂了此功能暂时无法使用。😿"
m[ErrGlmBusy] = "现在有太多人咨询小护,请稍后再来。"
m[ErrGlmHistoryLoss] = "抱歉!小护找不到之前的会话记录了,我们重新开始新的对话吧。"
// m[ErrGlmNewClientFail] = "小护新建客户端失败了,请稍后再来。"
}

View File

@ -2,6 +2,7 @@ package variable
import (
"catface/app/global/my_errors"
"catface/app/utils/llm_factory"
"catface/app/utils/snow_flake/snowflake_interf"
"catface/app/utils/yml_config/ymlconfig_interf"
"log"
@ -10,7 +11,6 @@ import (
"github.com/casbin/casbin/v2"
"github.com/elastic/go-elasticsearch/v8"
"github.com/yankeguo/zhipu"
"go.uber.org/zap"
"gorm.io/gorm"
)
@ -44,8 +44,8 @@ var (
//casbin 全局操作指针
Enforcer *casbin.SyncedEnforcer
// GLM 全局客户端
GlmClient *zhipu.Client
// GLM 全局客户端集中管理
GlmClientHub *llm_factory.GlmClientHub
// ES 全局客户端
ElasticClient *elasticsearch.Client

View File

@ -2,7 +2,10 @@ package web
import (
"catface/app/global/consts"
"catface/app/global/errcode"
"catface/app/global/variable"
"catface/app/service/nlp"
"catface/app/utils/llm_factory"
"catface/app/utils/response"
"github.com/gin-gonic/gin"
@ -14,7 +17,14 @@ type Nlp struct {
func (n *Nlp) Title(context *gin.Context) {
content := context.GetString(consts.ValidatorPrefix + "content")
newTitle := nlp.GenerateTitle(content)
tempGlmKey := variable.SnowFlake.GetIdAsString()
client, ercode := variable.GlmClientHub.GetOneGlmClient(tempGlmKey, llm_factory.GlmModeSimple)
if ercode > 0 {
response.Fail(context, ercode, errcode.ErrMsg[ercode], errcode.ErrMsgForUser[ercode])
}
defer variable.GlmClientHub.ReleaseOneGlmClient(tempGlmKey) // 临时使用,用完就释放。
newTitle := nlp.GenerateTitle(content, client)
if newTitle != "" {
response.Success(context, consts.CurdStatusOkMsg, gin.H{"title": newTitle})
} else {

View File

@ -5,7 +5,10 @@ import (
"catface/app/global/variable"
"catface/app/model_es"
"catface/app/service/nlp"
"catface/app/utils/llm_factory"
"catface/app/utils/micro_service"
"catface/app/utils/response"
"encoding/json"
"io"
"net/http"
@ -49,6 +52,24 @@ type Rag struct {
func (r *Rag) ChatSSE(context *gin.Context) {
query := context.Query("query")
token := context.Query("token")
// 0-1. 测试 python
if !micro_service.TestLinkPythonService() {
code := errcode.ErrPythonService
response.Fail(context, code, errcode.ErrMsg[code], "")
return
}
// 0-2. 获取一个 GLM Client
if token == "" {
token = variable.SnowFlake.GetIdAsString()
}
client, ercode := variable.GlmClientHub.GetOneGlmClient(token, llm_factory.GlmModeKnowledgeHub)
if ercode != 0 {
response.Fail(context, ercode, errcode.ErrMsg[ercode], errcode.ErrMsgForUser[ercode])
return
}
// 1. query embedding
embedding, ok := nlp.GetEmbedding(query)
@ -73,7 +94,7 @@ func (r *Rag) ChatSSE(context *gin.Context) {
// 3. LLM answer
go func() {
err := nlp.ChatKnoledgeRAG(docs[0].Content, query, ch)
err := nlp.ChatKnoledgeRAG(docs[0].Content, query, ch, client)
if err != nil {
variable.ZapLog.Error("ChatKnoledgeRAG error", zap.Error(err))
}
@ -104,15 +125,56 @@ var upgrader = websocket.Upgrader{ // TEST 测试,先写一个裸的 wss
func (r *Rag) ChatWebSocket(context *gin.Context) {
query := context.Query("query")
token := context.Query("token")
// 0. 协议升级
if token == "" {
token = variable.SnowFlake.GetIdAsString()
}
// 0-1. 协议升级
ws, err := upgrader.Upgrade(context.Writer, context.Request, nil)
if err != nil {
variable.ZapLog.Error("OnOpen error", zap.Error(err))
response.Fail(context, errcode.ErrWebsocketUpgradeFail, errcode.ErrMsg[errcode.ErrWebsocketUpgradeFail], "")
return
}
defer ws.Close()
defer func() { // UPDATE 临时方案,之后考虑结合 jwt 维护的 token 处理。
tokenMsg := struct {
Type string `json:"type"`
Token string `json:"token"`
}{
Type: "token",
Token: token,
}
tokenBytes, _ := json.Marshal(tokenMsg)
err := ws.WriteMessage(websocket.TextMessage, tokenBytes)
if err != nil {
variable.ZapLog.Error("Failed to send token message via WebSocket", zap.Error(err))
}
ws.Close()
}()
// 0-2. 测试 Python 微服务是否启动
if !micro_service.TestLinkPythonService() {
code := errcode.ErrPythonServierDown
err := ws.WriteMessage(websocket.TextMessage, []byte(errcode.ErrMsgForUser[code]))
if err != nil {
variable.ZapLog.Error("Failed to send error message via WebSocket", zap.Error(err))
}
return
}
// 0-3. 从 GLM_HUB 中获取一个可用的 glm client;
client, ercode := variable.GlmClientHub.GetOneGlmClient(token, llm_factory.GlmModeKnowledgeHub)
if ercode != 0 {
variable.ZapLog.Error("GetOneGlmClient error", zap.Error(err))
err := ws.WriteMessage(websocket.TextMessage, []byte(errcode.ErrMsgForUser[ercode]))
if err != nil {
variable.ZapLog.Error("Failed to send error message via WebSocket", zap.Error(err))
}
return
}
// 1. query embedding
embedding, ok := nlp.GetEmbedding(query)
@ -143,7 +205,7 @@ func (r *Rag) ChatWebSocket(context *gin.Context) {
ch := make(chan string) // TIP 建立通道。
go func() {
err := nlp.ChatKnoledgeRAG(docs[0].Content, query, ch)
err := nlp.ChatKnoledgeRAG(docs[0].Content, query, ch, client)
if err != nil {
variable.ZapLog.Error("ChatKnoledgeRAG error", zap.Error(err))
}

View File

@ -12,7 +12,7 @@ import (
// INFO 虽然起名为 Chat但是默认就会去查询 知识库,也就是不作为一般的 LLM-chat 来使用。
type Chat struct {
Query string `form:"query" json:"query" binding:"required"`
// TODO 这里还需要处理一下历史记录?
Token string `form:"token" json:"token"` // UPDATE 暂时不想启用 user 的 token就先单独处理。
}
func (c Chat) CheckParams(context *gin.Context) {

View File

@ -5,16 +5,18 @@ import (
"catface/app/service/nlp/glm"
"fmt"
"strings"
"github.com/yankeguo/zhipu"
)
func GenerateTitle(content string) string {
func GenerateTitle(content string, client *zhipu.ChatCompletionService) string {
message := variable.PromptsYml.GetString("Prompt.Title") + content
title, _ := glm.Chat(message)
title, _ := glm.Chat(message, client)
return title
}
// ChatKnoledgeRAG 使用 RAG 模型进行知识问答
func ChatKnoledgeRAG(doc, query string, ch chan<- string) error {
func ChatKnoledgeRAG(doc, query string, ch chan<- string, client *zhipu.ChatCompletionService) error {
// 读取配置文件中的 KnoledgeRAG 模板
promptTemplate := variable.PromptsYml.GetString("Prompt.KnoledgeRAG")
@ -24,7 +26,7 @@ func ChatKnoledgeRAG(doc, query string, ch chan<- string) error {
// 调用聊天接口
// err := glm.ChatStream(message, ch)
err := glm.BufferedChatStream(message, ch)
err := glm.BufferedChatStream(message, ch, client)
if err != nil {
return fmt.Errorf("调用聊天接口失败: %w", err)
}

View File

@ -4,19 +4,20 @@ import (
"catface/app/global/variable"
"context"
"errors"
"fmt"
"strings"
"time"
"github.com/yankeguo/zhipu"
"go.uber.org/zap"
)
// ChatWithGLM 封装了与GLM模型进行对话的逻辑
func Chat(message string) (string, error) {
service := variable.GlmClient.ChatCompletion("glm-4-flash").
AddMessage(zhipu.ChatCompletionMessage{
Role: "user",
Content: message,
})
func Chat(message string, client *zhipu.ChatCompletionService) (string, error) {
service := client.AddMessage(zhipu.ChatCompletionMessage{
Role: "user",
Content: message,
})
res, err := service.Do(context.Background())
if err != nil {
@ -28,9 +29,8 @@ func Chat(message string) (string, error) {
}
// ChatStream 接收一个消息和一个通道,将流式响应发送到通道中
func ChatStream(message string, ch chan<- string) error {
service := variable.GlmClient.ChatCompletion("glm-4-flash").
AddMessage(zhipu.ChatCompletionMessage{Role: "user", Content: message}).
func ChatStream(message string, ch chan<- string, client *zhipu.ChatCompletionService) error {
service := client.AddMessage(zhipu.ChatCompletionMessage{Role: "user", Content: message}).
SetStreamHandler(func(chunk zhipu.ChatCompletionResponse) error {
content := chunk.Choices[0].Delta.Content
if content != "" {
@ -39,22 +39,30 @@ func ChatStream(message string, ch chan<- string) error {
return nil
})
// Test
messages := client.GetMessages()
for id, message := range messages {
variable.ZapLog.Info(fmt.Sprintf("message-%d", id+1), zap.String("message", message.(zhipu.ChatCompletionMessage).Role), zap.String("content", message.(zhipu.ChatCompletionMessage).Content))
}
// 执行服务调用
_, err := service.Do(context.Background())
res, err := service.Do(context.Background())
if err != nil {
return err
}
// 增加 AI 回答的消息记录。
client.AddMessage(zhipu.ChatCompletionMessage{Role: "assistant", Content: res.Choices[0].Message.Content})
return nil
}
// 带缓冲机制的 ChatStream计数 & 计时 双判定。
func BufferedChatStream(message string, ch chan<- string) error {
func BufferedChatStream(message string, ch chan<- string, client *zhipu.ChatCompletionService) error {
bufferedCh := make(chan string) // 带缓冲的通道缓冲大小为10
timer := time.NewTimer(500 * time.Millisecond) // 定时器500毫秒
go func() {
err := ChatStream(message, bufferedCh)
err := ChatStream(message, bufferedCh, client)
if err != nil {
return
}

View File

@ -0,0 +1,116 @@
package llm_factory
import (
"catface/app/global/errcode"
"time"
"github.com/yankeguo/zhipu"
)
// INFO 维护 GLM Client 与用户之间的客户端消息队列,也就是在 "github.com/yankeguo/zhipu" 的基础上实现一层封装。
type GlmClientHub struct {
MaxIdle int
MaxActive int
ApiKey string
DefaultModelName string
InitPrompt string
Clients map[string]*ClientInfo
LifeTime time.Duration
}
type ClientInfo struct {
Client *zhipu.ChatCompletionService
LastUsed time.Time
}
func InitGlmClientHub(maxIdle, maxActive, lifetime int, apiKey, defaultModelName, initPrompt string) *GlmClientHub {
hub := &GlmClientHub{
MaxIdle: maxIdle,
MaxActive: maxActive,
ApiKey: apiKey,
DefaultModelName: defaultModelName,
InitPrompt: initPrompt,
Clients: make(map[string]*ClientInfo),
LifeTime: time.Duration(lifetime) * time.Second,
}
go hub.cleanupRoutine() // 启动定时器清理过期会话。
return hub
}
const (
GlmModeSimple = iota
GlmModeKnowledgeHub
)
/**
* @description: 鉴权用户之后根据其 ID 来从 map池 里获取之前的连接
* // UPDATE 现在只是单用户单连接(也就是只支持“同时只有一个对话”),之后可以考虑扩展【消息队列】的封装方式。
* 默认启用的是 没有预设的 prompt 的空
* @param {string} token // TODO 如何在 token 中保存信息?
* @return {*}
*/
func (g *GlmClientHub) GetOneGlmClient(token string, mode int) (client *zhipu.ChatCompletionService, code int) {
if info, ok := g.Clients[token]; ok {
info.LastUsed = time.Now() // INFO 刷新生命周期
return info.Client, 0
}
// 空闲数检查
if g.MaxIdle > 0 {
g.MaxIdle -= 1
} else {
code = errcode.ErrGlmBusy
return
}
// Client Init
preClient, err := zhipu.NewClient(zhipu.WithAPIKey(g.ApiKey))
if err != nil {
code = errcode.ErrGlmNewClientFail
return
}
client = preClient.ChatCompletion(g.DefaultModelName)
if mode == GlmModeKnowledgeHub {
client.AddMessage(zhipu.ChatCompletionMessage{
Role: zhipu.RoleSystem, // TIP 使用 System 角色来初始化对话
Content: g.InitPrompt,
})
}
g.Clients[token] = &ClientInfo{
Client: client,
LastUsed: time.Now(),
}
return
}
// cleanupRoutine 定期检查并清理超过 1 小时未使用的 Client
func (g *GlmClientHub) cleanupRoutine() {
ticker := time.NewTicker(10 * time.Minute)
for range ticker.C {
g.cleanupClients()
}
}
// cleanupClients 清理超过 1 小时未使用的 Client
func (g *GlmClientHub) cleanupClients() {
now := time.Now()
for token, info := range g.Clients {
if now.Sub(info.LastUsed) > g.LifeTime {
delete(g.Clients, token)
g.MaxIdle += 1
}
}
}
/**
* @description: 显式地释放资源
* @param {string} token
* @return {*}
*/
func (g *GlmClientHub) ReleaseOneGlmClient(token string) {
delete(g.Clients, token)
g.MaxIdle += 1
}

View File

@ -2,10 +2,18 @@ package micro_service
import (
"catface/app/global/variable"
"context"
"fmt"
"strings"
"github.com/carlmjohnson/requests"
)
func TestLinkPythonService() bool {
err := requests.URL(FetchPythonServiceUrl("link_test")).Fetch(context.Background())
return err == nil
}
func FetchPythonServiceUrl(url string) string {
// 检查 url 是否以 / 开头,如果是则去掉开头的 /
if strings.HasPrefix(url, "/") {

View File

@ -4,6 +4,7 @@ import (
"catface/app/global/consts"
"catface/app/global/variable"
"catface/app/utils/snow_flake/snowflake_interf"
"strconv"
"sync"
"time"
)
@ -45,3 +46,9 @@ func (s *snowflake) GetId() int64 {
r := (now-consts.StartTimeStamp)<<consts.TimestampShift | (s.machineId << consts.MachineIdShift) | (s.sequence)
return r
}
// 简单将 id 转化为 string 使用。
func (s *snowflake) GetIdAsString() string {
id := s.GetId()
return strconv.FormatInt(id, 10)
}

View File

@ -2,4 +2,5 @@ package snowflake_interf
type InterfaceSnowFlake interface {
GetId() int64
GetIdAsString() string
}

View File

@ -8,6 +8,7 @@ import (
"catface/app/service/sys_log_hook"
"catface/app/utils/casbin_v2"
"catface/app/utils/gorm_v2"
"catface/app/utils/llm_factory"
"catface/app/utils/snow_flake"
"catface/app/utils/validator_translation"
"catface/app/utils/websocket/core"
@ -17,7 +18,6 @@ import (
"os"
"github.com/elastic/go-elasticsearch/v8"
"github.com/yankeguo/zhipu"
)
func checkRequiredFolders() {
@ -114,15 +114,18 @@ func init() {
log.Fatal(my_errors.ErrorsValidatorTransInitFail + err.Error())
}
// 11. GLM 客户端启动
var err error
variable.GlmClient, err = zhipu.NewClient(zhipu.WithAPIKey(variable.ConfigYml.GetString("Glm.ApiKey")))
if err != nil {
log.Fatal(my_errors.ErrorsGlmClientInitFail + err.Error())
}
// 11. GLM 资源池管理 初始化
variable.GlmClientHub = llm_factory.InitGlmClientHub(
variable.ConfigYml.GetInt("Glm.MaxActive"),
variable.ConfigYml.GetInt("Glm.MaxIdle"),
variable.ConfigYml.GetInt("Glm.LifeTime"),
variable.ConfigYml.GetString("Glm.ApiKey"),
variable.ConfigYml.GetString("Glm.DefaultModelName"),
variable.PromptsYml.GetString("Prompt.InitPrompt"),
)
// 12. ES 客户端启动
var err error
variable.ElasticClient, err = elasticsearch.NewClient(elasticsearch.Config{
Addresses: []string{variable.ConfigYml.GetString("ElasticSearch.Addr")},
})

View File

@ -161,7 +161,11 @@ Weixin:
Glm:
ApiKey: "0cf510ebc01599dba2a593069c1bdfbc.nQBQ4skP8xBh7ijU"
DefaultModel: "glm-4-flash"
DefaultModelName: "glm-4-flash"
MaxIdle: 100 # INFO 最大空闲数,毕竟 messages 缓存在内存里,需要根据服务器性能调整。
MaxActive: 50 # 最大活跃数,考虑 WebSocket 通信的资源消耗。
MaxActiceOneUser: 1 # TODO 不过从功能设计来说很没有必要,单例就很足够;毕竟又不是专门来提供 GPT 服务的。
LifeTime: 1800 # Client 有效生存周期1800 = 0.5h
# qiNiu 云存储配置
QiNiu:

View File

@ -1,9 +1,21 @@
Prompt:
InitPrompt: "你是一个知识库整理助手,名为“小护”。
你所要做的就是根据知识库搜索的结果回答用户的问题。
1. 你的逻辑和推理应当严谨、智能和可辩护;
2. 必须使用中文回复问题,除非有特别说明;
3. 如果知识库中存在猫猫的精细位置,你只能用来推理,不能二次透露;
4. 你所服务的前端仅支持简单的文本解析,所以你只能用基本的文本格式回答;
5. 如果知识库中的信息无法回答用户的问题就说知识库中未找到合适的资料用户可以选择联系小保的官方QQ账号3144089037尝试咨询
6. 知识库的信息会在消息队列里以 system 的方式和用户 user 区分,做好判断。
以下是不要使用的要求:
1. 不要使用诸如markdown等文本解析方式的标签比如**。" # TODO 不确定 6 会不会有用。
Title: "请根据以下长文本生成一个合适的标题不需要书名号长度10字内"
KnoledgeRAG: "使用以上下文来回答用户的问题。如果你不知道答案,就说你不知道。总是使用中文回答。
问题: {question}
可参考的上下文:
···
{context}
···
如果给定的上下文无法让你做出回答,请回答知识库中没有这个内容,你不知道。"
KnoledgeRAG: "使用以上下文来回答用户的问题,如果无法回答,请回答知识库中未找到符合的资料,我不知道。
问题: {question}
可参考的上下文:
···
{context}
···
如果给定的上下文无法让你做出回答,请回答知识库中未找到符合的资料,我不知道。"

51
test/glm_test.go Normal file
View File

@ -0,0 +1,51 @@
package test
import (
"catface/app/global/variable"
_ "catface/bootstrap"
"context"
"testing"
"github.com/yankeguo/zhipu"
)
func TestGlmMessageStore(t *testing.T) {
glmClient, err := zhipu.NewClient(zhipu.WithAPIKey(variable.ConfigYml.GetString("Glm.ApiKey")))
if err != nil {
t.Fatal(err)
}
service := glmClient.ChatCompletion("glm-4-flash").AddMessage(zhipu.ChatCompletionMessage{
Role: "user",
Content: "请你记一下我说的数字2",
})
res, err := service.Do(context.Background())
if err != nil {
apiErrorCode := zhipu.GetAPIErrorCode(err)
t.Fatal(apiErrorCode)
}
t.Log(res.Choices[0].Message.Content)
messages := service.GetMessages()
for _, message := range messages {
t.Log(message.(zhipu.ChatCompletionMessage).Role, message.(zhipu.ChatCompletionMessage).Content)
}
service.AddMessage(zhipu.ChatCompletionMessage{
Role: "user",
Content: "现在请你复述我刚才说的数字。",
})
res, err = service.Do(context.Background())
if err != nil {
apiErrorCode := zhipu.GetAPIErrorCode(err)
t.Fatal(apiErrorCode)
}
messages = service.GetMessages()
for _, message := range messages {
t.Log(message.(zhipu.ChatCompletionMessage).Role, message.(zhipu.ChatCompletionMessage).Content)
}
t.Log(res.Choices[0].Message.Content)
}

View File

@ -179,7 +179,6 @@ func TestStruct(t *testing.T) {
defer redisClient.ReleaseOneRedisClient()
tmp := model_redis.SelectedAnimal4Prefer{
Key: 2,
NewCatsId: []int64{1, 2},
EncounteredCatsId: []int64{3, 4},
}