Compare commits

..

3 Commits

Author SHA1 Message Date
Roy Han
2647a0e443 num parallel embed 2024-07-26 15:18:35 -07:00
Michael Yang
ec4c35fe99 Merge pull request #5512 from ollama/mxyng/detect-stop
autodetect stop parameters from template
2024-07-26 13:48:23 -07:00
Michael Yang
ebc529cbb3 autodetect stop parameters from template 2024-07-12 16:01:23 -07:00
23 changed files with 180 additions and 40 deletions

View File

@@ -44,19 +44,17 @@ type blobDownload struct {
context.CancelFunc
done chan struct{}
done bool
err error
references atomic.Int32
}
type blobDownloadPart struct {
N int
Offset int64
Size int64
Completed atomic.Int64
lastUpdatedMu sync.Mutex
lastUpdated time.Time
N int
Offset int64
Size int64
Completed int64
lastUpdated time.Time
*blobDownload `json:"-"`
}
@@ -74,7 +72,7 @@ func (p *blobDownloadPart) Name() string {
}
func (p *blobDownloadPart) StartsAt() int64 {
return p.Offset + p.Completed.Load()
return p.Offset + p.Completed
}
func (p *blobDownloadPart) StopsAt() int64 {
@@ -84,9 +82,7 @@ func (p *blobDownloadPart) StopsAt() int64 {
func (p *blobDownloadPart) Write(b []byte) (n int, err error) {
n = len(b)
p.blobDownload.Completed.Add(int64(n))
p.lastUpdatedMu.Lock()
p.lastUpdated = time.Now()
p.lastUpdatedMu.Unlock()
return n, nil
}
@@ -96,8 +92,6 @@ func (b *blobDownload) Prepare(ctx context.Context, requestURL *url.URL, opts *r
return err
}
b.done = make(chan struct{})
for _, partFilePath := range partFilePaths {
part, err := b.readPart(partFilePath)
if err != nil {
@@ -105,7 +99,7 @@ func (b *blobDownload) Prepare(ctx context.Context, requestURL *url.URL, opts *r
}
b.Total += part.Size
b.Completed.Add(part.Completed.Load())
b.Completed.Add(part.Completed)
b.Parts = append(b.Parts, part)
}
@@ -145,7 +139,6 @@ func (b *blobDownload) Prepare(ctx context.Context, requestURL *url.URL, opts *r
}
func (b *blobDownload) Run(ctx context.Context, requestURL *url.URL, opts *registryOptions) {
defer close(b.done)
b.err = b.run(ctx, requestURL, opts)
}
@@ -237,7 +230,7 @@ func (b *blobDownload) run(ctx context.Context, requestURL *url.URL, opts *regis
g.SetLimit(numDownloadParts)
for i := range b.Parts {
part := b.Parts[i]
if part.Completed.Load() == part.Size {
if part.Completed == part.Size {
continue
}
@@ -245,7 +238,7 @@ func (b *blobDownload) run(ctx context.Context, requestURL *url.URL, opts *regis
var err error
for try := 0; try < maxRetries; try++ {
w := io.NewOffsetWriter(file, part.StartsAt())
err = b.downloadChunk(inner, directURL, w, part)
err = b.downloadChunk(inner, directURL, w, part, opts)
switch {
case errors.Is(err, context.Canceled), errors.Is(err, syscall.ENOSPC):
// return immediately if the context is canceled or the device is out of space
@@ -286,31 +279,29 @@ func (b *blobDownload) run(ctx context.Context, requestURL *url.URL, opts *regis
return err
}
b.done = true
return nil
}
func (b *blobDownload) downloadChunk(ctx context.Context, requestURL *url.URL, w io.Writer, part *blobDownloadPart) error {
func (b *blobDownload) downloadChunk(ctx context.Context, requestURL *url.URL, w io.Writer, part *blobDownloadPart, opts *registryOptions) error {
g, ctx := errgroup.WithContext(ctx)
g.Go(func() error {
req, err := http.NewRequestWithContext(ctx, "GET", requestURL.String(), nil)
if err != nil {
return err
}
req.Header.Set("Range", fmt.Sprintf("bytes=%d-%d", part.StartsAt(), part.StopsAt()-1))
resp, err := http.DefaultClient.Do(req)
headers := make(http.Header)
headers.Set("Range", fmt.Sprintf("bytes=%d-%d", part.StartsAt(), part.StopsAt()-1))
resp, err := makeRequestWithRetry(ctx, http.MethodGet, requestURL, headers, nil, opts)
if err != nil {
return err
}
defer resp.Body.Close()
n, err := io.CopyN(w, io.TeeReader(resp.Body, part), part.Size-part.Completed.Load())
n, err := io.CopyN(w, io.TeeReader(resp.Body, part), part.Size-part.Completed)
if err != nil && !errors.Is(err, context.Canceled) && !errors.Is(err, io.ErrUnexpectedEOF) {
// rollback progress
b.Completed.Add(-n)
return err
}
part.Completed.Add(n)
part.Completed += n
if err := b.writePart(part.Name(), part); err != nil {
return err
}
@@ -324,21 +315,15 @@ func (b *blobDownload) downloadChunk(ctx context.Context, requestURL *url.URL, w
for {
select {
case <-ticker.C:
if part.Completed.Load() >= part.Size {
if part.Completed >= part.Size {
return nil
}
part.lastUpdatedMu.Lock()
lastUpdated := part.lastUpdated
part.lastUpdatedMu.Unlock()
if !lastUpdated.IsZero() && time.Since(lastUpdated) > 5*time.Second {
if !part.lastUpdated.IsZero() && time.Since(part.lastUpdated) > 5*time.Second {
const msg = "%s part %d stalled; retrying. If this persists, press ctrl-c to exit, then 'ollama pull' to find a faster connection."
slog.Info(fmt.Sprintf(msg, b.Digest[7:19], part.N))
// reset last updated
part.lastUpdatedMu.Lock()
part.lastUpdated = time.Time{}
part.lastUpdatedMu.Unlock()
return errPartStalled
}
case <-ctx.Done():
@@ -403,8 +388,6 @@ func (b *blobDownload) Wait(ctx context.Context, fn func(api.ProgressResponse))
ticker := time.NewTicker(60 * time.Millisecond)
for {
select {
case <-b.done:
return b.err
case <-ticker.C:
fn(api.ProgressResponse{
Status: fmt.Sprintf("pulling %s", b.Digest[7:19]),
@@ -412,6 +395,10 @@ func (b *blobDownload) Wait(ctx context.Context, fn func(api.ProgressResponse))
Total: b.Total,
Completed: b.Completed.Load(),
})
if b.done || b.err != nil {
return b.err
}
case <-ctx.Done():
return ctx.Err()
}

View File

@@ -263,13 +263,27 @@ func detectChatTemplate(layers []*layerGGML) ([]*layerGGML, error) {
if t, err := template.Named(s); err != nil {
slog.Debug("template detection", "error", err)
} else {
tmpl, err := NewLayer(t.Reader(), "application/vnd.ollama.image.template")
layer, err := NewLayer(t.Reader(), "application/vnd.ollama.image.template")
if err != nil {
return nil, err
}
tmpl.status = fmt.Sprintf("using autodetected template %s", t.Name)
layers = append(layers, &layerGGML{tmpl, nil})
layer.status = fmt.Sprintf("using autodetected template %s", t.Name)
layers = append(layers, &layerGGML{layer, nil})
if t.Parameters != nil {
var b bytes.Buffer
if err := json.NewEncoder(&b).Encode(t.Parameters); err != nil {
return nil, err
}
layer, err := NewLayer(&b, "application/vnd.ollama.image.params")
if err != nil {
return nil, err
}
layers = append(layers, &layerGGML{layer, nil})
}
}
}
}

View File

@@ -599,9 +599,10 @@ func TestCreateDetectTemplate(t *testing.T) {
}
checkFileExists(t, filepath.Join(p, "blobs", "*"), []string{
filepath.Join(p, "blobs", "sha256-0d79f567714c62c048378f2107fb332dabee0135d080c302d884317da9433cc5"),
filepath.Join(p, "blobs", "sha256-553c4a3f747b3d22a4946875f1cc8ed011c2930d83f864a0c7265f9ec0a20413"),
filepath.Join(p, "blobs", "sha256-c608dc615584cd20d9d830363dabf8a4783ae5d34245c3d8c115edb3bc7b28e4"),
filepath.Join(p, "blobs", "sha256-f836ee110db21567f826332e4cedd746c06d10664fd5a9ea3659e3683a944510"),
filepath.Join(p, "blobs", "sha256-ea34c57ba5b78b740aafe2aeb74dc6507fc3ad14170b64c26a04fb9e36c88d75"),
})
})

View File

@@ -132,6 +132,8 @@ func (s *Scheduler) processPending(ctx context.Context) {
if len(pending.model.ProjectorPaths) > 0 && numParallel != 1 {
numParallel = 1
slog.Warn("multimodal models don't support parallel requests yet")
} else if strings.Contains(pending.model.Config.ModelFamily, "bert") {
numParallel = runtime.NumCPU()
}
for {

8
template/alfred.json Normal file
View File

@@ -0,0 +1,8 @@
{
"stop": [
"<start_system>",
"<end_message>",
"<start_user>",
"<start_assistant>"
]
}

6
template/alpaca.json Normal file
View File

@@ -0,0 +1,6 @@
{
"stop": [
"### Instruction:",
"### Response"
]
}

6
template/chatml.json Normal file
View File

@@ -0,0 +1,6 @@
{
"stop": [
"<|im_start|>",
"<|im_end|>"
]
}

8
template/chatqa.json Normal file
View File

@@ -0,0 +1,8 @@
{
"stop": [
"System:",
"User:",
"Assistant:",
"<|begin_of_text|>"
]
}

View File

@@ -0,0 +1,7 @@
{
"stop": [
"Source:",
"Destination:",
"<step>"
]
}

View File

@@ -0,0 +1,6 @@
{
"stop": [
"User:",
"Assistant:"
]
}

View File

@@ -0,0 +1,6 @@
{
"stop": [
"<start_of_turn>",
"<end_of_turn>"
]
}

View File

@@ -0,0 +1,7 @@
{
"stop": [
"System:",
"Question:",
"Answer:"
]
}

View File

@@ -0,0 +1,8 @@
{
"stop": [
"[INST]",
"[/INST]",
"<<SYS>>",
"<</SYS>>"
]
}

View File

@@ -0,0 +1,7 @@
{
"stop": [
"<|start_header_id|>",
"<|end_header_id|>",
"<|eot_id|>"
]
}

6
template/magicoder.json Normal file
View File

@@ -0,0 +1,6 @@
{
"stop": [
"@@ Instruction",
"@@ Response"
]
}

View File

@@ -0,0 +1,6 @@
{
"stop": [
"<|im_start|>",
"<|im_end|>"
]
}

5
template/openchat.json Normal file
View File

@@ -0,0 +1,5 @@
{
"stop": [
"<|end_of_turn|>"
]
}

8
template/phi-3.json Normal file
View File

@@ -0,0 +1,8 @@
{
"stop": [
"<|end|>",
"<|system|>",
"<|user|>",
"<|assistant|>"
]
}

View File

@@ -0,0 +1,7 @@
{
"stop": [
"### System:",
"### User:",
"### Assistant"
]
}

View File

@@ -0,0 +1,7 @@
{
"stop": [
"### Instruction",
"### Response",
"<|endoftext|>"
]
}

View File

@@ -23,6 +23,7 @@ import (
var indexBytes []byte
//go:embed *.gotmpl
//go:embed *.json
var templatesFS embed.FS
var templatesOnce = sync.OnceValues(func() ([]*named, error) {
@@ -39,6 +40,15 @@ var templatesOnce = sync.OnceValues(func() ([]*named, error) {
// normalize line endings
t.Bytes = bytes.ReplaceAll(bts, []byte("\r\n"), []byte("\n"))
params, err := templatesFS.ReadFile(t.Name + ".json")
if err != nil {
continue
}
if err := json.Unmarshal(params, &t.Parameters); err != nil {
return nil, err
}
}
return templates, nil
@@ -48,6 +58,10 @@ type named struct {
Name string `json:"name"`
Template string `json:"template"`
Bytes []byte
Parameters *struct {
Stop []string `json:"stop"`
}
}
func (t named) Reader() io.Reader {

6
template/vicuna.json Normal file
View File

@@ -0,0 +1,6 @@
{
"stop": [
"USER:",
"ASSISTANT:"
]
}

8
template/zephyr.json Normal file
View File

@@ -0,0 +1,8 @@
{
"stop": [
"<|system|>",
"</s>",
"<|user|>",
"<|assistant|>"
]
}