add evals, reorder menu

Signed-off-by: Ettore Di Giacinto <mudler@localai.io>
This commit is contained in:
Ettore Di Giacinto
2026-03-21 00:59:17 +00:00
parent fa8b1a8673
commit f1223b45b2
13 changed files with 772 additions and 274 deletions

View File

@@ -21,7 +21,7 @@ import backend_pb2
import backend_pb2_grpc
_ONE_DAY_IN_SECONDS = 60 * 60 * 24
MAX_WORKERS = int(os.environ.get('PYTHON_GRPC_MAX_WORKERS', '1'))
MAX_WORKERS = int(os.environ.get('PYTHON_GRPC_MAX_WORKERS', '4'))
class ProgressCallback:
@@ -38,16 +38,22 @@ class ProgressCallback:
parent = self
class _Callback(TrainerCallback):
def __init__(self):
self._train_start_time = None
def on_train_begin(self, args, state, control, **kwargs):
self._train_start_time = time.time()
def on_log(self, args, state, control, logs=None, **kwargs):
if logs is None:
return
total_steps = state.max_steps if state.max_steps > 0 else 0
progress = (state.global_step / total_steps * 100) if total_steps > 0 else 0
eta = 0.0
if state.global_step > 0 and total_steps > 0:
elapsed = time.time() - state.logging_steps # approximate
if state.global_step > 0 and total_steps > 0 and self._train_start_time:
elapsed = time.time() - self._train_start_time
remaining_steps = total_steps - state.global_step
if state.global_step > 1:
if state.global_step > 0:
eta = remaining_steps * (elapsed / state.global_step)
extra_metrics = {}
@@ -72,6 +78,58 @@ class ProgressCallback:
)
parent.progress_queue.put(update)
def on_prediction_step(self, args, state, control, **kwargs):
"""Send periodic updates during evaluation so the UI doesn't freeze."""
if not hasattr(self, '_eval_update_counter'):
self._eval_update_counter = 0
self._eval_update_counter += 1
# Throttle: send an update every 10 prediction steps
if self._eval_update_counter % 10 != 0:
return
total_steps = state.max_steps if state.max_steps > 0 else 0
progress = (state.global_step / total_steps * 100) if total_steps > 0 else 0
update = backend_pb2.FineTuneProgressUpdate(
job_id=parent.job_id,
current_step=state.global_step,
total_steps=total_steps,
current_epoch=float(state.epoch or 0),
total_epochs=float(parent.total_epochs),
progress_percent=float(progress),
status="training",
message=f"Evaluating... (batch {self._eval_update_counter})",
)
parent.progress_queue.put(update)
def on_evaluate(self, args, state, control, metrics=None, **kwargs):
"""Report eval results once evaluation is done."""
# Reset prediction counter for next eval round
self._eval_update_counter = 0
total_steps = state.max_steps if state.max_steps > 0 else 0
progress = (state.global_step / total_steps * 100) if total_steps > 0 else 0
eval_loss = 0.0
extra_metrics = {}
if metrics:
eval_loss = float(metrics.get('eval_loss', 0))
for k, v in metrics.items():
if isinstance(v, (int, float)) and k not in ('eval_loss', 'epoch'):
extra_metrics[k] = float(v)
update = backend_pb2.FineTuneProgressUpdate(
job_id=parent.job_id,
current_step=state.global_step,
total_steps=total_steps,
current_epoch=float(state.epoch or 0),
total_epochs=float(parent.total_epochs),
eval_loss=eval_loss,
progress_percent=float(progress),
status="training",
message=f"Evaluation complete at step {state.global_step}",
extra_metrics=extra_metrics,
)
parent.progress_queue.put(update)
def on_save(self, args, state, control, **kwargs):
checkpoint_path = os.path.join(args.output_dir, f"checkpoint-{state.global_step}")
update = backend_pb2.FineTuneProgressUpdate(
@@ -256,6 +314,38 @@ class BackendServicer(backend_pb2_grpc.BackendServicer):
else:
dataset = load_dataset(request.dataset_source, split=dataset_split)
# Eval dataset setup
eval_dataset = None
eval_strategy = extra.get("eval_strategy", "steps")
eval_steps = int(extra.get("eval_steps", str(request.save_steps if request.save_steps > 0 else 500)))
if eval_strategy != "no":
eval_split = extra.get("eval_split")
eval_dataset_source = extra.get("eval_dataset_source")
if eval_split:
# Load a specific split as eval dataset
if os.path.exists(request.dataset_source):
if request.dataset_source.endswith('.json') or request.dataset_source.endswith('.jsonl'):
eval_dataset = load_dataset("json", data_files=request.dataset_source, split=eval_split)
elif request.dataset_source.endswith('.csv'):
eval_dataset = load_dataset("csv", data_files=request.dataset_source, split=eval_split)
else:
eval_dataset = load_dataset(request.dataset_source, split=eval_split)
else:
eval_dataset = load_dataset(request.dataset_source, split=eval_split)
elif eval_dataset_source:
# Load eval dataset from a separate source
eval_dataset = load_dataset(eval_dataset_source, split="train")
else:
# Auto-split from training set
eval_split_ratio = float(extra.get("eval_split_ratio", "0.1"))
split = dataset.train_test_split(test_size=eval_split_ratio)
dataset = split["train"]
eval_dataset = split["test"]
if eval_strategy == "no":
eval_dataset = None
# Training config
output_dir = request.output_dir or f"./output-{job.job_id}"
num_epochs = request.num_epochs if request.num_epochs > 0 else 3
@@ -308,6 +398,12 @@ class BackendServicer(backend_pb2_grpc.BackendServicer):
if save_total_limit:
_save_kwargs["save_total_limit"] = save_total_limit
# Eval kwargs
_eval_kwargs = {}
if eval_dataset is not None:
_eval_kwargs["eval_strategy"] = eval_strategy
_eval_kwargs["eval_steps"] = eval_steps
# Common training arguments shared by all methods
_common_args = dict(
output_dir=output_dir,
@@ -324,6 +420,7 @@ class BackendServicer(backend_pb2_grpc.BackendServicer):
report_to="none",
**_save_kwargs,
**common_train_kwargs,
**_eval_kwargs,
)
# Select trainer based on training method
@@ -343,6 +440,7 @@ class BackendServicer(backend_pb2_grpc.BackendServicer):
model=model,
args=training_args,
train_dataset=dataset,
eval_dataset=eval_dataset,
processing_class=tokenizer,
callbacks=[progress_cb.get_callback()],
)
@@ -365,6 +463,7 @@ class BackendServicer(backend_pb2_grpc.BackendServicer):
model=model,
args=training_args,
train_dataset=dataset,
eval_dataset=eval_dataset,
processing_class=tokenizer,
callbacks=[progress_cb.get_callback()],
)
@@ -420,6 +519,7 @@ class BackendServicer(backend_pb2_grpc.BackendServicer):
model=model,
args=training_args,
train_dataset=dataset,
eval_dataset=eval_dataset,
processing_class=tokenizer,
callbacks=[progress_cb.get_callback()],
)
@@ -440,6 +540,7 @@ class BackendServicer(backend_pb2_grpc.BackendServicer):
model=model,
args=training_args,
train_dataset=dataset,
eval_dataset=eval_dataset,
processing_class=tokenizer,
callbacks=[progress_cb.get_callback()],
)
@@ -478,6 +579,7 @@ class BackendServicer(backend_pb2_grpc.BackendServicer):
model=model,
args=training_args,
train_dataset=dataset,
eval_dataset=eval_dataset,
processing_class=tokenizer,
callbacks=[progress_cb.get_callback()],
)
@@ -528,9 +630,8 @@ class BackendServicer(backend_pb2_grpc.BackendServicer):
continue
def StopFineTune(self, request, context):
# No-op: stopping is handled by killing the backend process from Go.
# This stub remains to satisfy the proto-generated gRPC interface.
return backend_pb2.Result(success=True, message="No-op (process kill used instead)")
# Stopping is handled by killing the process from Go via ShutdownModel.
return backend_pb2.Result(success=True, message="OK")
def ListCheckpoints(self, request, context):
output_dir = request.output_dir