package zhipu
import (
"bufio"
"bytes"
"context"
"encoding/json"
"errors"
"fmt"
"io"
"log"
"net/http"
"sync"
"github.com/gtkit/llm-zhipu/utils"
)
var (
// errorPrefix = []byte(`data: {"error":`).
headerID = []byte("id:")
headerData = []byte("data:")
headerEvent = []byte("event:")
headerMeta = []byte("meta:")
)
type StreamReaderInterface[T streamable] interface {
Recv() (response T, err error)
Close()
processLines() (T, error)
unmarshalError() (errResp *ErrorResponse)
}
type streamable interface {
GlmChatCompletionStreamResponse
}
type streamReader[T streamable] struct {
isFinished bool
reader *bufio.Reader
response *http.Response
errAccumulator utils.ErrorAccumulator
unmarshaler utils.Unmarshaler
}
var _ StreamReaderInterface[GlmChatCompletionStreamResponse] = (*streamReader[GlmChatCompletionStreamResponse])(nil)
type Event struct {
ID []byte
Data []byte
Event []byte
Meta []byte
}
var pool = sync.Pool{
New: func() interface{} {
return &Event{}
},
}
func (stream *streamReader[T]) Recv() (response T, err error) {
if stream.isFinished {
err = io.EOF
return
}
response, err = stream.processLines()
return
}
func (stream *streamReader[T]) processLines() (T, error) {
var (
hasErrorPrefix bool
response T
)
event, ok := pool.Get().(*Event)
if !ok {
return *new(T), nil
}
for {
rawLine, readErr := stream.reader.ReadBytes('\n')
if readErr != nil || hasErrorPrefix {
respErr := stream.unmarshalError()
if respErr != nil {
return *new(T), fmt.Errorf("error, %w", respErr.Error)
}
if errors.Is(readErr, context.Canceled) {
return *new(T), readErr
}
return *new(T), fmt.Errorf("stream read error, %w", readErr)
}
if bytes.Equal(rawLine, []byte("\n")) {
meta := &GlmMeta{}
if len(event.Meta) > 0 {
if err := json.Unmarshal(event.Meta, meta); err != nil {
log.Println("---Meta Unmarshal error:", err)
}
}
response = T{
ID: string(event.ID),
Event: string(event.Event),
Choices: []ChatCompletionStreamChoice{
{
Delta: ChatCompletionStreamChoiceDelta{
Content: string(event.Data),
},
},
},
Meta: *meta,
}
putEvent(event)
return response, nil
}
e, _ := processEvent(rawLine)
if e.Event != nil {
if bytes.Equal(e.Event, []byte("finish")) {
stream.isFinished = true
}
event.Event = e.Event
}
if e.ID != nil {
event.ID = e.ID
}
if e.Data != nil {
event.Data = append(event.Data, e.Data...)
}
if e.Meta != nil {
event.Meta = e.Meta
}
}
}
func putEvent(e *Event) {
e.ID = nil
e.Event = nil
e.Data = nil
pool.Put(e)
}
func (stream *streamReader[T]) unmarshalError() (errResp *ErrorResponse) {
errBytes := stream.errAccumulator.Bytes()
if len(errBytes) == 0 {
return
}
err := stream.unmarshaler.Unmarshal(errBytes, &errResp)
if err != nil {
errResp = nil
}
return
}
func (stream *streamReader[T]) Close() {
stream.response.Body.Close()
}
func processEvent(msg []byte) (event *Event, err error) {
var e Event
if len(msg) < 1 {
return nil, errors.New("event message was empty")
}
switch {
case bytes.HasPrefix(msg, headerID):
e.ID = append([]byte(nil), trimHeader(len(headerID), msg)...)
case bytes.HasPrefix(msg, headerData):
e.Data = append(e.Data, trimHeader(len(headerData), msg)...)
if bytes.Equal(msg, []byte("data: \n")) {
e.Data = append(e.Data, byte('\n'))
}
// The spec says that a line that simply contains the string "data" should be treated
// as a data field with an empty body.
case bytes.Equal(msg, bytes.TrimSuffix(headerData, []byte(":"))):
e.Data = append(e.Data, byte('\n'))
case bytes.HasPrefix(msg, headerEvent):
e.Event = append([]byte(nil), trimHeader(len(headerEvent), msg)...)
case bytes.HasPrefix(msg, headerMeta):
e.Meta = append([]byte(nil), trimHeader(len(headerMeta), msg)...)
default:
// Ignore any garbage that doesn't match what we're looking for.
}
// Trim the last "\n" per the spec.
if len(e.Data) > 1 {
e.Data = bytes.TrimSuffix(e.Data, []byte("\n"))
}
return &e, err
}
func trimHeader(size int, data []byte) []byte {
if data == nil || len(data) < size {
return data
}
data = data[size:]
// Remove optional leading whitespace
if len(data) > 0 && data[0] == 32 {
data = data[1:]
}
return data
}
没有合适的资源?快使用搜索试试~ 我知道了~
《AI大模型》--智谱Ai大模型.zip
共20个文件
go:14个
mod:1个
gitignore:1个
1.该资源内容由用户上传,如若侵权请联系客服进行举报
2.虚拟产品一经售出概不退款(资源遇到问题,请及时私信上传者)
2.虚拟产品一经售出概不退款(资源遇到问题,请及时私信上传者)
版权申诉
0 下载量 194 浏览量
2024-03-22
16:23:57
上传
评论
收藏 21KB ZIP 举报
温馨提示
人工智能学习总结成果,希望可以帮到大家,有疑问欢迎随时沟通~ 人工智能学习总结成果,希望可以帮到大家,有疑问欢迎随时沟通~ 人工智能学习总结成果,希望可以帮到大家,有疑问欢迎随时沟通~ 人工智能学习总结成果,希望可以帮到大家,有疑问欢迎随时沟通~ 人工智能学习总结成果,希望可以帮到大家,有疑问欢迎随时沟通~
资源推荐
资源详情
资源评论
收起资源包目录
《AI大模型》--智谱Ai大模型.zip (20个子文件)
chat_stream.go 1KB
chat.go 3KB
go.mod 103B
stream_reader.go 4KB
go.sum 364B
Makefile 734B
LICENSE 11KB
token.go 1KB
common.go 211B
utils
error_accumulator.go 692B
request_builder.go 1KB
request_builder_test.go 2KB
unmarshaler.go 243B
marshaller.go 234B
config.go 497B
client.go 4KB
test
zpchat_test.go 1KB
.golangci.yml 13KB
.gitignore 312B
error.go 2KB
共 20 条
- 1
资源评论
季风泯灭的季节
- 粉丝: 599
- 资源: 2921
上传资源 快速赚钱
- 我的内容管理 展开
- 我的资源 快来上传第一个资源
- 我的收益 登录查看自己的收益
- 我的积分 登录查看自己的积分
- 我的C币 登录后查看C币余额
- 我的收藏
- 我的下载
- 下载帮助
最新资源
- SSCMS登录模块需要的JS文件
- JSP网络购物中心毕业设计(源代码+论文).rar
- 白盒测试报告.docx
- 基于LM5117芯片评估开发板硬件参考设计(原理图+PCB)+中英文数据手册资料.zip
- 照片批量重命名软件(文件批量修改图片文件名)
- app.apk
- 人工智能(AI)是计算机科学的一个分支,旨在开发和应用能够模拟、延伸和扩展人类智能的理论、方法和技术,包括机器人、语言识别、图像
- 嵌入式与物联网开发是当今信息技术领域的两大重要分支,它们相互交织,共同推动着智能化时代的进步 嵌入式开发主要关注在嵌入式操作
- 网络安全,这一看似高深莫测的领域,实则与我们每个人的生活息息相关
- 毕业设计基于深度学习的视觉问答系统源码+文档说明+答辩PPT.zip
资源上传下载、课程学习等过程中有任何疑问或建议,欢迎提出宝贵意见哦~我们会及时处理!
点击此处反馈
安全验证
文档复制为VIP权益,开通VIP直接复制
信息提交成功