Add Worker service with isolated task management and integrate into app

This commit is contained in:
2025-09-22 19:30:18 +02:00
parent cfd38211a2
commit d374ff6bf9
5 changed files with 370 additions and 58 deletions

View File

@@ -6,6 +6,8 @@ import 'package:fluttery/secure_storage.dart';
import 'package:fluttery/src/logger/logger_impl.dart';
import 'package:fluttery/src/preferences/preferences_impl.dart';
import 'package:fluttery/src/storage/secure/secure_storage_impl.dart';
import 'package:fluttery/src/system/worker/worker_impl.dart';
import 'package:fluttery/worker.dart';
import 'package:kiwi/kiwi.dart';
import 'package:shared_preferences/shared_preferences.dart';
@@ -31,12 +33,12 @@ class App {
/// Registers the default services required by the application.
static Future<void> registerDefaultServices() async {
registerService<Logger>(() => LoggerImpl());
final prefs = await SharedPreferences.getInstance();
registerService<Preferences>(() => PreferencesImpl(instance: prefs));
registerService<Logger>(() => LoggerImpl());
registerService<Preferences>(() => PreferencesImpl(instance: prefs));
registerService<SecureStorage>(() => SecureStorageImpl());
registerService<Worker>(() => WorkerImpl());
}
}

View File

@@ -0,0 +1,142 @@
import 'dart:async';
import 'dart:isolate';
import 'package:flutter/services.dart'
show ServicesBinding, RootIsolateToken, BackgroundIsolateBinaryMessenger;
import 'package:fluttery/fluttery.dart';
import 'package:fluttery/logger.dart';
import 'package:fluttery/worker.dart';
class WorkerImpl implements Worker {
WorkerImpl({
this.defaultTimeout,
this.maxHistory = 100,
RootIsolateToken? rootToken, // optional for tests
}) : _rootToken =
rootToken ?? ServicesBinding.rootIsolateToken; // <— static getter
final Duration? defaultTimeout;
final int maxHistory;
// Captured from the root isolate (may be null in some test envs)
final RootIsolateToken? _rootToken;
int _seq = 0;
final Map<String, WorkerInfo> _active = {};
final List<WorkerInfo> _history = [];
@override
Future<T> spawn<T>(
String debugName,
FutureOr<T> Function() task, {
void Function()? preTask,
Duration? timeout,
}) {
final id = (++_seq).toString().padLeft(6, '0');
final started = DateTime.now();
_active[id] = WorkerInfo(
id: id,
name: debugName,
startedAt: started,
status: WorkerStatus.running,
);
Future<T> inner() async {
final token = _rootToken; // captured into closure
return Isolate.run<T>(() async {
// Initialize platform channels for this background isolate.
if (token != null) {
BackgroundIsolateBinaryMessenger.ensureInitialized(token);
}
// Now it's safe to touch plugins (e.g., SharedPreferences).
await App.registerDefaultServices();
preTask?.call();
return await Future.sync(task);
}, debugName: debugName);
}
final effectiveTimeout = timeout ?? defaultTimeout;
final fut = effectiveTimeout == null
? inner()
: inner().timeout(effectiveTimeout);
fut
.then((_) {
_finish(id, status: WorkerStatus.completed);
})
.catchError((e, st) {
_finish(
id,
status: e is TimeoutException
? WorkerStatus.timedOut
: WorkerStatus.failed,
error: e,
stack: st,
);
// Best-effort logging
try {
App.service<Logger>().error(
'Worker job "$debugName" ($id) failed: $e',
st,
);
} catch (_) {}
});
return fut;
}
void _finish(
String id, {
required WorkerStatus status,
Object? error,
StackTrace? stack,
}) {
final prev = _active.remove(id);
final endedAt = DateTime.now();
final info = WorkerInfo(
id: prev?.id ?? id,
name: prev?.name ?? 'unknown',
startedAt: prev?.startedAt ?? endedAt,
status: status,
endedAt: endedAt,
error: error,
stackTrace: stack,
);
_history.insert(0, info);
if (_history.length > maxHistory) {
_history.removeRange(maxHistory, _history.length);
}
}
@override
List<WorkerInfo> getActiveWorkers() =>
_active.values.toList()
..sort((a, b) => a.startedAt.compareTo(b.startedAt));
@override
List<WorkerInfo> getAllWorkers() => [...getActiveWorkers(), ..._history];
@override
WorkerInfo? getWorker(String id) {
final active = _active[id];
if (active != null) return active;
for (final w in _history) {
if (w.id == id) return w;
}
return null;
}
@override
void purge({Duration maxAge = const Duration(minutes: 30)}) {
final cutoff = DateTime.now().subtract(maxAge);
_history.removeWhere((w) => (w.endedAt ?? w.startedAt).isBefore(cutoff));
}
}

View File

@@ -0,0 +1,47 @@
import 'dart:async';
import 'package:fluttery/fluttery.dart';
abstract class Worker extends Service {
Future<T> spawn<T>(
String debugName,
FutureOr<T> Function() task, {
void Function()? preTask,
Duration? timeout, // per-job override
});
/// Currently running jobs.
List<WorkerInfo> getActiveWorkers();
/// All known jobs (active + completed + failed), up to a capped history.
List<WorkerInfo> getAllWorkers();
/// Optional: get a single worker by id.
WorkerInfo? getWorker(String id);
/// Remove completed/failed jobs older than [maxAge] from history.
void purge({Duration maxAge = const Duration(minutes: 30)});
}
enum WorkerStatus { running, completed, failed, timedOut }
class WorkerInfo {
WorkerInfo({
required this.id,
required this.name,
required this.startedAt,
required this.status,
this.endedAt,
this.error,
this.stackTrace,
});
final String id;
final String name;
final DateTime startedAt;
final WorkerStatus status;
final DateTime? endedAt;
final Object? error;
final StackTrace? stackTrace;
Duration get duration => ((endedAt ?? DateTime.now()).difference(startedAt));
}