mirror of
https://github.com/ollama/ollama.git
synced 2026-01-03 21:19:20 -05:00
Compare commits
2 Commits
roy-embed-
...
bmizerany/
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
348effebf9 | ||
|
|
903d9b1a41 |
@@ -44,17 +44,19 @@ type blobDownload struct {
|
||||
|
||||
context.CancelFunc
|
||||
|
||||
done bool
|
||||
done chan struct{}
|
||||
err error
|
||||
references atomic.Int32
|
||||
}
|
||||
|
||||
type blobDownloadPart struct {
|
||||
N int
|
||||
Offset int64
|
||||
Size int64
|
||||
Completed int64
|
||||
lastUpdated time.Time
|
||||
N int
|
||||
Offset int64
|
||||
Size int64
|
||||
Completed atomic.Int64
|
||||
|
||||
lastUpdatedMu sync.Mutex
|
||||
lastUpdated time.Time
|
||||
|
||||
*blobDownload `json:"-"`
|
||||
}
|
||||
@@ -72,7 +74,7 @@ func (p *blobDownloadPart) Name() string {
|
||||
}
|
||||
|
||||
func (p *blobDownloadPart) StartsAt() int64 {
|
||||
return p.Offset + p.Completed
|
||||
return p.Offset + p.Completed.Load()
|
||||
}
|
||||
|
||||
func (p *blobDownloadPart) StopsAt() int64 {
|
||||
@@ -82,7 +84,9 @@ func (p *blobDownloadPart) StopsAt() int64 {
|
||||
func (p *blobDownloadPart) Write(b []byte) (n int, err error) {
|
||||
n = len(b)
|
||||
p.blobDownload.Completed.Add(int64(n))
|
||||
p.lastUpdatedMu.Lock()
|
||||
p.lastUpdated = time.Now()
|
||||
p.lastUpdatedMu.Unlock()
|
||||
return n, nil
|
||||
}
|
||||
|
||||
@@ -92,6 +96,8 @@ func (b *blobDownload) Prepare(ctx context.Context, requestURL *url.URL, opts *r
|
||||
return err
|
||||
}
|
||||
|
||||
b.done = make(chan struct{})
|
||||
|
||||
for _, partFilePath := range partFilePaths {
|
||||
part, err := b.readPart(partFilePath)
|
||||
if err != nil {
|
||||
@@ -99,7 +105,7 @@ func (b *blobDownload) Prepare(ctx context.Context, requestURL *url.URL, opts *r
|
||||
}
|
||||
|
||||
b.Total += part.Size
|
||||
b.Completed.Add(part.Completed)
|
||||
b.Completed.Add(part.Completed.Load())
|
||||
b.Parts = append(b.Parts, part)
|
||||
}
|
||||
|
||||
@@ -139,6 +145,7 @@ func (b *blobDownload) Prepare(ctx context.Context, requestURL *url.URL, opts *r
|
||||
}
|
||||
|
||||
func (b *blobDownload) Run(ctx context.Context, requestURL *url.URL, opts *registryOptions) {
|
||||
defer close(b.done)
|
||||
b.err = b.run(ctx, requestURL, opts)
|
||||
}
|
||||
|
||||
@@ -230,7 +237,7 @@ func (b *blobDownload) run(ctx context.Context, requestURL *url.URL, opts *regis
|
||||
g.SetLimit(numDownloadParts)
|
||||
for i := range b.Parts {
|
||||
part := b.Parts[i]
|
||||
if part.Completed == part.Size {
|
||||
if part.Completed.Load() == part.Size {
|
||||
continue
|
||||
}
|
||||
|
||||
@@ -238,7 +245,7 @@ func (b *blobDownload) run(ctx context.Context, requestURL *url.URL, opts *regis
|
||||
var err error
|
||||
for try := 0; try < maxRetries; try++ {
|
||||
w := io.NewOffsetWriter(file, part.StartsAt())
|
||||
err = b.downloadChunk(inner, directURL, w, part, opts)
|
||||
err = b.downloadChunk(inner, directURL, w, part)
|
||||
switch {
|
||||
case errors.Is(err, context.Canceled), errors.Is(err, syscall.ENOSPC):
|
||||
// return immediately if the context is canceled or the device is out of space
|
||||
@@ -279,29 +286,31 @@ func (b *blobDownload) run(ctx context.Context, requestURL *url.URL, opts *regis
|
||||
return err
|
||||
}
|
||||
|
||||
b.done = true
|
||||
return nil
|
||||
}
|
||||
|
||||
func (b *blobDownload) downloadChunk(ctx context.Context, requestURL *url.URL, w io.Writer, part *blobDownloadPart, opts *registryOptions) error {
|
||||
func (b *blobDownload) downloadChunk(ctx context.Context, requestURL *url.URL, w io.Writer, part *blobDownloadPart) error {
|
||||
g, ctx := errgroup.WithContext(ctx)
|
||||
g.Go(func() error {
|
||||
headers := make(http.Header)
|
||||
headers.Set("Range", fmt.Sprintf("bytes=%d-%d", part.StartsAt(), part.StopsAt()-1))
|
||||
resp, err := makeRequestWithRetry(ctx, http.MethodGet, requestURL, headers, nil, opts)
|
||||
req, err := http.NewRequestWithContext(ctx, "GET", requestURL.String(), nil)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
req.Header.Set("Range", fmt.Sprintf("bytes=%d-%d", part.StartsAt(), part.StopsAt()-1))
|
||||
resp, err := http.DefaultClient.Do(req)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
defer resp.Body.Close()
|
||||
|
||||
n, err := io.CopyN(w, io.TeeReader(resp.Body, part), part.Size-part.Completed)
|
||||
n, err := io.CopyN(w, io.TeeReader(resp.Body, part), part.Size-part.Completed.Load())
|
||||
if err != nil && !errors.Is(err, context.Canceled) && !errors.Is(err, io.ErrUnexpectedEOF) {
|
||||
// rollback progress
|
||||
b.Completed.Add(-n)
|
||||
return err
|
||||
}
|
||||
|
||||
part.Completed += n
|
||||
part.Completed.Add(n)
|
||||
if err := b.writePart(part.Name(), part); err != nil {
|
||||
return err
|
||||
}
|
||||
@@ -315,15 +324,21 @@ func (b *blobDownload) downloadChunk(ctx context.Context, requestURL *url.URL, w
|
||||
for {
|
||||
select {
|
||||
case <-ticker.C:
|
||||
if part.Completed >= part.Size {
|
||||
if part.Completed.Load() >= part.Size {
|
||||
return nil
|
||||
}
|
||||
|
||||
if !part.lastUpdated.IsZero() && time.Since(part.lastUpdated) > 5*time.Second {
|
||||
part.lastUpdatedMu.Lock()
|
||||
lastUpdated := part.lastUpdated
|
||||
part.lastUpdatedMu.Unlock()
|
||||
|
||||
if !lastUpdated.IsZero() && time.Since(lastUpdated) > 5*time.Second {
|
||||
const msg = "%s part %d stalled; retrying. If this persists, press ctrl-c to exit, then 'ollama pull' to find a faster connection."
|
||||
slog.Info(fmt.Sprintf(msg, b.Digest[7:19], part.N))
|
||||
// reset last updated
|
||||
part.lastUpdatedMu.Lock()
|
||||
part.lastUpdated = time.Time{}
|
||||
part.lastUpdatedMu.Unlock()
|
||||
return errPartStalled
|
||||
}
|
||||
case <-ctx.Done():
|
||||
@@ -388,6 +403,8 @@ func (b *blobDownload) Wait(ctx context.Context, fn func(api.ProgressResponse))
|
||||
ticker := time.NewTicker(60 * time.Millisecond)
|
||||
for {
|
||||
select {
|
||||
case <-b.done:
|
||||
return b.err
|
||||
case <-ticker.C:
|
||||
fn(api.ProgressResponse{
|
||||
Status: fmt.Sprintf("pulling %s", b.Digest[7:19]),
|
||||
@@ -395,10 +412,6 @@ func (b *blobDownload) Wait(ctx context.Context, fn func(api.ProgressResponse))
|
||||
Total: b.Total,
|
||||
Completed: b.Completed.Load(),
|
||||
})
|
||||
|
||||
if b.done || b.err != nil {
|
||||
return b.err
|
||||
}
|
||||
case <-ctx.Done():
|
||||
return ctx.Err()
|
||||
}
|
||||
|
||||
@@ -263,27 +263,13 @@ func detectChatTemplate(layers []*layerGGML) ([]*layerGGML, error) {
|
||||
if t, err := template.Named(s); err != nil {
|
||||
slog.Debug("template detection", "error", err)
|
||||
} else {
|
||||
layer, err := NewLayer(t.Reader(), "application/vnd.ollama.image.template")
|
||||
tmpl, err := NewLayer(t.Reader(), "application/vnd.ollama.image.template")
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
layer.status = fmt.Sprintf("using autodetected template %s", t.Name)
|
||||
layers = append(layers, &layerGGML{layer, nil})
|
||||
|
||||
if t.Parameters != nil {
|
||||
var b bytes.Buffer
|
||||
if err := json.NewEncoder(&b).Encode(t.Parameters); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
layer, err := NewLayer(&b, "application/vnd.ollama.image.params")
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
layers = append(layers, &layerGGML{layer, nil})
|
||||
}
|
||||
tmpl.status = fmt.Sprintf("using autodetected template %s", t.Name)
|
||||
layers = append(layers, &layerGGML{tmpl, nil})
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -599,10 +599,9 @@ func TestCreateDetectTemplate(t *testing.T) {
|
||||
}
|
||||
|
||||
checkFileExists(t, filepath.Join(p, "blobs", "*"), []string{
|
||||
filepath.Join(p, "blobs", "sha256-0d79f567714c62c048378f2107fb332dabee0135d080c302d884317da9433cc5"),
|
||||
filepath.Join(p, "blobs", "sha256-553c4a3f747b3d22a4946875f1cc8ed011c2930d83f864a0c7265f9ec0a20413"),
|
||||
filepath.Join(p, "blobs", "sha256-c608dc615584cd20d9d830363dabf8a4783ae5d34245c3d8c115edb3bc7b28e4"),
|
||||
filepath.Join(p, "blobs", "sha256-ea34c57ba5b78b740aafe2aeb74dc6507fc3ad14170b64c26a04fb9e36c88d75"),
|
||||
filepath.Join(p, "blobs", "sha256-f836ee110db21567f826332e4cedd746c06d10664fd5a9ea3659e3683a944510"),
|
||||
})
|
||||
})
|
||||
|
||||
|
||||
@@ -132,8 +132,6 @@ func (s *Scheduler) processPending(ctx context.Context) {
|
||||
if len(pending.model.ProjectorPaths) > 0 && numParallel != 1 {
|
||||
numParallel = 1
|
||||
slog.Warn("multimodal models don't support parallel requests yet")
|
||||
} else if strings.Contains(pending.model.Config.ModelFamily, "bert") {
|
||||
numParallel = runtime.NumCPU()
|
||||
}
|
||||
|
||||
for {
|
||||
|
||||
@@ -1,8 +0,0 @@
|
||||
{
|
||||
"stop": [
|
||||
"<start_system>",
|
||||
"<end_message>",
|
||||
"<start_user>",
|
||||
"<start_assistant>"
|
||||
]
|
||||
}
|
||||
@@ -1,6 +0,0 @@
|
||||
{
|
||||
"stop": [
|
||||
"### Instruction:",
|
||||
"### Response"
|
||||
]
|
||||
}
|
||||
@@ -1,6 +0,0 @@
|
||||
{
|
||||
"stop": [
|
||||
"<|im_start|>",
|
||||
"<|im_end|>"
|
||||
]
|
||||
}
|
||||
@@ -1,8 +0,0 @@
|
||||
{
|
||||
"stop": [
|
||||
"System:",
|
||||
"User:",
|
||||
"Assistant:",
|
||||
"<|begin_of_text|>"
|
||||
]
|
||||
}
|
||||
@@ -1,7 +0,0 @@
|
||||
{
|
||||
"stop": [
|
||||
"Source:",
|
||||
"Destination:",
|
||||
"<step>"
|
||||
]
|
||||
}
|
||||
@@ -1,6 +0,0 @@
|
||||
{
|
||||
"stop": [
|
||||
"User:",
|
||||
"Assistant:"
|
||||
]
|
||||
}
|
||||
@@ -1,6 +0,0 @@
|
||||
{
|
||||
"stop": [
|
||||
"<start_of_turn>",
|
||||
"<end_of_turn>"
|
||||
]
|
||||
}
|
||||
@@ -1,7 +0,0 @@
|
||||
{
|
||||
"stop": [
|
||||
"System:",
|
||||
"Question:",
|
||||
"Answer:"
|
||||
]
|
||||
}
|
||||
@@ -1,8 +0,0 @@
|
||||
{
|
||||
"stop": [
|
||||
"[INST]",
|
||||
"[/INST]",
|
||||
"<<SYS>>",
|
||||
"<</SYS>>"
|
||||
]
|
||||
}
|
||||
@@ -1,7 +0,0 @@
|
||||
{
|
||||
"stop": [
|
||||
"<|start_header_id|>",
|
||||
"<|end_header_id|>",
|
||||
"<|eot_id|>"
|
||||
]
|
||||
}
|
||||
@@ -1,6 +0,0 @@
|
||||
{
|
||||
"stop": [
|
||||
"@@ Instruction",
|
||||
"@@ Response"
|
||||
]
|
||||
}
|
||||
@@ -1,6 +0,0 @@
|
||||
{
|
||||
"stop": [
|
||||
"<|im_start|>",
|
||||
"<|im_end|>"
|
||||
]
|
||||
}
|
||||
@@ -1,5 +0,0 @@
|
||||
{
|
||||
"stop": [
|
||||
"<|end_of_turn|>"
|
||||
]
|
||||
}
|
||||
@@ -1,8 +0,0 @@
|
||||
{
|
||||
"stop": [
|
||||
"<|end|>",
|
||||
"<|system|>",
|
||||
"<|user|>",
|
||||
"<|assistant|>"
|
||||
]
|
||||
}
|
||||
@@ -1,7 +0,0 @@
|
||||
{
|
||||
"stop": [
|
||||
"### System:",
|
||||
"### User:",
|
||||
"### Assistant"
|
||||
]
|
||||
}
|
||||
@@ -1,7 +0,0 @@
|
||||
{
|
||||
"stop": [
|
||||
"### Instruction",
|
||||
"### Response",
|
||||
"<|endoftext|>"
|
||||
]
|
||||
}
|
||||
@@ -23,7 +23,6 @@ import (
|
||||
var indexBytes []byte
|
||||
|
||||
//go:embed *.gotmpl
|
||||
//go:embed *.json
|
||||
var templatesFS embed.FS
|
||||
|
||||
var templatesOnce = sync.OnceValues(func() ([]*named, error) {
|
||||
@@ -40,15 +39,6 @@ var templatesOnce = sync.OnceValues(func() ([]*named, error) {
|
||||
|
||||
// normalize line endings
|
||||
t.Bytes = bytes.ReplaceAll(bts, []byte("\r\n"), []byte("\n"))
|
||||
|
||||
params, err := templatesFS.ReadFile(t.Name + ".json")
|
||||
if err != nil {
|
||||
continue
|
||||
}
|
||||
|
||||
if err := json.Unmarshal(params, &t.Parameters); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
}
|
||||
|
||||
return templates, nil
|
||||
@@ -58,10 +48,6 @@ type named struct {
|
||||
Name string `json:"name"`
|
||||
Template string `json:"template"`
|
||||
Bytes []byte
|
||||
|
||||
Parameters *struct {
|
||||
Stop []string `json:"stop"`
|
||||
}
|
||||
}
|
||||
|
||||
func (t named) Reader() io.Reader {
|
||||
|
||||
@@ -1,6 +0,0 @@
|
||||
{
|
||||
"stop": [
|
||||
"USER:",
|
||||
"ASSISTANT:"
|
||||
]
|
||||
}
|
||||
@@ -1,8 +0,0 @@
|
||||
{
|
||||
"stop": [
|
||||
"<|system|>",
|
||||
"</s>",
|
||||
"<|user|>",
|
||||
"<|assistant|>"
|
||||
]
|
||||
}
|
||||
Reference in New Issue
Block a user