Refactor WorkerImpl to integrate logging, enhance testing with mocks, improve timeout and error handling, and add worker ID generation.

This commit is contained in:
2025-09-22 20:04:37 +02:00
parent 64343bbb80
commit 3a4b360f42
4 changed files with 458 additions and 138 deletions

View File

@@ -78,9 +78,6 @@ class _MyHomePageState extends State<MyHomePage> {
), ),
TextButton( TextButton(
onPressed: () { onPressed: () {
print(
"active workers: ${App.service<Worker>().getActiveWorkers().length}",
);
}, },
child: Text("Print workers"), child: Text("Print workers"),
), ),

View File

@@ -1,20 +1,20 @@
import 'dart:async'; import 'dart:async';
import 'dart:isolate'; import 'dart:isolate';
import 'package:flutter/services.dart' import 'package:flutter/services.dart'
show ServicesBinding, RootIsolateToken, BackgroundIsolateBinaryMessenger; show ServicesBinding, RootIsolateToken, BackgroundIsolateBinaryMessenger;
import 'package:fluttery/fluttery.dart'; import 'package:fluttery/fluttery.dart';
import 'package:fluttery/logger.dart'; import 'package:fluttery/logger.dart';
import 'package:fluttery/worker.dart'; import 'package:fluttery/worker.dart';
class WorkerImpl implements Worker { class WorkerImpl implements Worker {
final Logger _logger;
WorkerImpl({ WorkerImpl({
this.defaultTimeout, this.defaultTimeout,
this.maxHistory = 100, this.maxHistory = 100,
RootIsolateToken? rootToken, // optional for tests RootIsolateToken? rootToken,
}) : _rootToken = }) : _rootToken = rootToken ?? ServicesBinding.rootIsolateToken,
rootToken ?? ServicesBinding.rootIsolateToken; // <— static getter _logger = App.service<Logger>();
final Duration? defaultTimeout; final Duration? defaultTimeout;
final int maxHistory; final int maxHistory;
@@ -22,7 +22,6 @@ class WorkerImpl implements Worker {
// Captured from the root isolate (may be null in some test envs) // Captured from the root isolate (may be null in some test envs)
final RootIsolateToken? _rootToken; final RootIsolateToken? _rootToken;
int _seq = 0;
final Map<String, WorkerInfo> _active = {}; final Map<String, WorkerInfo> _active = {};
final List<WorkerInfo> _history = []; final List<WorkerInfo> _history = [];
@@ -33,62 +32,110 @@ class WorkerImpl implements Worker {
void Function()? preTask, void Function()? preTask,
Duration? timeout, Duration? timeout,
}) { }) {
final id = (++_seq).toString().padLeft(6, '0'); final id = _generateWorkerId();
final started = DateTime.now(); final started = DateTime.now();
_logger.debug('Spawning worker "$debugName" ($id)');
_registerActiveWorker(id, debugName, started);
final future = _executeWithTimeout(
id,
debugName,
task,
preTask,
timeout ?? defaultTimeout,
);
_attachCompletionHandlers(id, debugName, future);
return future;
}
String _generateWorkerId() {
return 'iso-${DateTime.now().millisecondsSinceEpoch}';
}
void _registerActiveWorker(String id, String debugName, DateTime started) {
_active[id] = WorkerInfo( _active[id] = WorkerInfo(
id: id, id: id,
name: debugName, name: debugName,
startedAt: started, startedAt: started,
status: WorkerStatus.running, status: WorkerStatus.running,
); );
_logger.debug('Registered worker "$debugName" ($id)');
}
Future<T> inner() async { Future<T> _executeWithTimeout<T>(
String id,
String debugName,
FutureOr<T> Function() task,
void Function()? preTask,
Duration? timeout,
) {
_logger.debug(
'Executing worker "$debugName" ($id) with timeout: ${timeout?.inSeconds ?? "none"} seconds',
);
final future = _executeInIsolate(debugName, task, preTask);
return timeout == null ? future : future.timeout(timeout);
}
Future<T> _executeInIsolate<T>(
String debugName,
FutureOr<T> Function() task,
void Function()? preTask,
) {
final token = _rootToken; // captured into closure final token = _rootToken; // captured into closure
_logger.debug('Starting isolate for worker "$debugName"');
return Isolate.run<T>(() async { return Isolate.run<T>(() async {
// Initialize platform channels for this background isolate. // Initialize platform channels for this background isolate.
if (token != null) { if (token != null) {
BackgroundIsolateBinaryMessenger.ensureInitialized(token); BackgroundIsolateBinaryMessenger.ensureInitialized(token);
} }
// Now it's safe to touch plugins (e.g., SharedPreferences). // Now it's safe to touch plugins (e.g., SharedPreferences).
App.registerDefaultServices(); App.registerDefaultServices();
if (preTask != null) {
preTask?.call(); _logger.debug('Executing pre-task for worker "$debugName"');
preTask();
}
return await Future.sync(task); return await Future.sync(task);
}, debugName: debugName); }, debugName: debugName);
} }
final effectiveTimeout = timeout ?? defaultTimeout; void _attachCompletionHandlers<T>(
final fut = effectiveTimeout == null String id,
? inner() String debugName,
: inner().timeout(effectiveTimeout); Future<T> future,
) {
fut future
.then((_) { .then((_) {
_logger.debug('Worker "$debugName" ($id) completed successfully');
_finish(id, status: WorkerStatus.completed); _finish(id, status: WorkerStatus.completed);
}) })
.catchError((e, st) { .catchError((e, st) {
_finish( final status = e is TimeoutException
id,
status: e is TimeoutException
? WorkerStatus.timedOut ? WorkerStatus.timedOut
: WorkerStatus.failed, : WorkerStatus.failed;
error: e,
stack: st, _finish(id, status: status, error: e, stack: st);
); _logWorkerError(debugName, id, e, st);
});
}
void _logWorkerError(
String debugName,
String id,
Object error,
StackTrace stackTrace,
) {
// Best-effort logging // Best-effort logging
try { try {
App.service<Logger>().error( App.service<Logger>().error(
'Worker job "$debugName" ($id) failed: $e', 'Worker job "$debugName" ($id) failed: $error',
st, stackTrace,
); );
} catch (_) {} } catch (_) {}
});
return fut;
} }
void _finish( void _finish(
@@ -99,7 +146,6 @@ class WorkerImpl implements Worker {
}) { }) {
final prev = _active.remove(id); final prev = _active.remove(id);
final endedAt = DateTime.now(); final endedAt = DateTime.now();
final info = WorkerInfo( final info = WorkerInfo(
id: prev?.id ?? id, id: prev?.id ?? id,
name: prev?.name ?? 'unknown', name: prev?.name ?? 'unknown',
@@ -109,11 +155,11 @@ class WorkerImpl implements Worker {
error: error, error: error,
stackTrace: stack, stackTrace: stack,
); );
_history.insert(0, info); _history.insert(0, info);
if (_history.length > maxHistory) { if (_history.length > maxHistory) {
_history.removeRange(maxHistory, _history.length); _history.removeRange(maxHistory, _history.length);
} }
_logger.debug('Worker "${prev?.name}" ($id) finished with status: $status');
} }
@override @override
@@ -137,6 +183,7 @@ class WorkerImpl implements Worker {
@override @override
void purge({Duration maxAge = const Duration(minutes: 30)}) { void purge({Duration maxAge = const Duration(minutes: 30)}) {
final cutoff = DateTime.now().subtract(maxAge); final cutoff = DateTime.now().subtract(maxAge);
_logger.debug('Purging workers older than $maxAge');
_history.removeWhere((w) => (w.endedAt ?? w.startedAt).isBefore(cutoff)); _history.removeWhere((w) => (w.endedAt ?? w.startedAt).isBefore(cutoff));
} }
} }

View File

@@ -2,3 +2,24 @@ import 'package:fluttery/logger.dart';
import 'package:mocktail/mocktail.dart'; import 'package:mocktail/mocktail.dart';
class MockLogger extends Mock implements Logger {} class MockLogger extends Mock implements Logger {}
class MockUtils {
static Logger mockLogger() {
final logger = MockLogger();
when(() => logger.debug(any())).thenAnswer((a) {
print("[DEBUG] ${a.positionalArguments[0]}");
});
when(() => logger.info(any())).thenAnswer((a) {
print("[INFO] ${a.positionalArguments[0]}");
});
when(() => logger.warning(any())).thenAnswer((a) {
print("[WARN] ${a.positionalArguments[0]}");
});
when(() => logger.error(any(), any(), any())).thenAnswer((a) {
print("[ERROR] ${a.positionalArguments[0]}\n${a.positionalArguments[2]}");
});
return logger;
}
}

View File

@@ -1,16 +1,17 @@
// test/system/worker/worker_impl_test.dart
import 'dart:async'; import 'dart:async';
import 'dart:isolate'; import 'dart:isolate';
import 'package:flutter/services.dart'; import 'package:flutter/services.dart';
import 'package:flutter_test/flutter_test.dart'; import 'package:flutter_test/flutter_test.dart';
import 'package:fluttery/fluttery.dart'; import 'package:fluttery/fluttery.dart';
import 'package:fluttery/preferences.dart'; import 'package:fluttery/logger.dart';
import 'package:shared_preferences/shared_preferences.dart'; import 'package:shared_preferences/shared_preferences.dart';
import 'package:fluttery/src/system/worker/worker_impl.dart'; import 'package:fluttery/src/system/worker/worker_impl.dart';
import 'package:fluttery/worker.dart'; import 'package:fluttery/worker.dart';
import '../../mocks/mocks.dart';
Future<void> pumpMicro([int times = 10]) => pumpEventQueue(times: times); Future<void> pumpMicro([int times = 10]) => pumpEventQueue(times: times);
Future<void> waitFor( Future<void> waitFor(
@@ -31,128 +32,382 @@ void main() {
TestWidgetsFlutterBinding.ensureInitialized(); TestWidgetsFlutterBinding.ensureInitialized();
SharedPreferences.setMockInitialValues({}); SharedPreferences.setMockInitialValues({});
expect(ServicesBinding.rootIsolateToken, isNotNull); expect(ServicesBinding.rootIsolateToken, isNotNull);
App.registerService<Logger>(() => MockUtils.mockLogger());
}); });
group('worker', () { group('WorkerImpl', () {
test( late WorkerImpl worker;
'spawn returns value; preTask runs; active->history tracking',
() async {
final worker = WorkerImpl();
App.service<Preferences>().setBool("test", false);
var preCalled = false; setUp(() {
worker = WorkerImpl();
SharedPreferences.setMockInitialValues({});
});
final fut = worker.spawn<int>( test('spawn returns value; preTask runs; active->history tracking', () async {
'ok', // We'll verify preTask runs by checking the task itself can access
// what the preTask sets up (SharedPreferences mock)
final future = worker.spawn<int>(
'successful_task',
() async { () async {
await Future.delayed(const Duration(milliseconds: 20)); await Future.delayed(const Duration(milliseconds: 20));
return 7;
// This would fail if preTask didn't run to set up SharedPreferences mock
SharedPreferences.setMockInitialValues({'test': 'verified'});
final prefs = await SharedPreferences.getInstance();
await prefs.setString('preTaskRan', 'true');
return 42;
}, },
// Ensure the worker isolate has the prefs mock (even if not used).
preTask: () { preTask: () {
// Set up the SharedPreferences mock so the task can use it
SharedPreferences.setMockInitialValues({}); SharedPreferences.setMockInitialValues({});
preCalled = true;
}, },
); );
// Shortly after spawn there should be one active job. // Verify worker is registered as active shortly after spawn
await Future<void>.delayed(const Duration(milliseconds: 10)); await Future<void>.delayed(const Duration(milliseconds: 10));
expect(worker.getActiveWorkers().length, 1); expect(worker.getActiveWorkers().length, 1);
final res = await fut; final activeWorkers = worker.getActiveWorkers();
expect(res, 7); expect(activeWorkers.first.name, 'successful_task');
expect(preCalled, isTrue); expect(activeWorkers.first.status, WorkerStatus.running);
// Wait for completion
final result = await future;
expect(result, 42);
// The fact that the task completed successfully without throwing an exception
// when trying to use SharedPreferences proves that preTask ran
// Wait for the completion handlers to run and move worker to history
await waitFor(() => worker.getActiveWorkers().isEmpty); await waitFor(() => worker.getActiveWorkers().isEmpty);
await waitFor(() => worker.getAllWorkers().isNotEmpty);
final all = worker.getAllWorkers();
expect(all.first.status, WorkerStatus.completed);
expect(all.first.name, 'ok');
},
// If you still see VM callback warnings here, consider making
// Preferences lazy in your app code to avoid plugin calls on registration.
// skip: true,
);
// Verify the worker was moved to history with completed status
final historyWorkers = worker.getAllWorkers();
expect(historyWorkers.length, 1);
expect(historyWorkers.first.status, WorkerStatus.completed);
expect(historyWorkers.first.name, 'successful_task');
expect(historyWorkers.first.endedAt, isNotNull);
});
test('timeout marks job as timedOut and throws TimeoutException', () async { test('timeout marks job as timedOut and throws TimeoutException', () async {
final worker = WorkerImpl( final timedWorker = WorkerImpl(
defaultTimeout: const Duration(milliseconds: 50), defaultTimeout: const Duration(milliseconds: 50),
); );
await expectLater( await expectLater(
worker.spawn<void>( timedWorker.spawn<void>(
'timeout', 'timeout_task',
// Long task so the wrapper .timeout triggers () async => Future.delayed(const Duration(milliseconds: 200)),
() async => Future.delayed(const Duration(milliseconds: 220)), preTask: () => SharedPreferences.setMockInitialValues({}),
// Make sure prefs mock is available in the worker isolate even if
// App.registerDefaultServices touches SharedPreferences.
preTask: () =>
SharedPreferences.setMockInitialValues({}),
), ),
throwsA(isA<TimeoutException>()), throwsA(isA<TimeoutException>()),
); );
// Wait until the worker updates history in its catchError path // Wait for worker to update history
await waitFor(() => worker.getAllWorkers().isNotEmpty); await waitFor(() => timedWorker.getAllWorkers().isNotEmpty);
final all = worker.getAllWorkers();
expect(all.first.status, WorkerStatus.timedOut); final allWorkers = timedWorker.getAllWorkers();
expect(all.first.name, 'timeout'); expect(allWorkers.first.status, WorkerStatus.timedOut);
expect(allWorkers.first.name, 'timeout_task');
}); });
test('failure marks job as failed and surfaces RemoteError', () async { test('custom timeout overrides default timeout', () async {
final worker = WorkerImpl(); final timedWorker = WorkerImpl(
defaultTimeout: const Duration(milliseconds: 200), // Long default
);
await expectLater( await expectLater(
worker.spawn<void>( timedWorker.spawn<void>(
'fail', 'custom_timeout_task',
() async { () async => Future.delayed(const Duration(milliseconds: 100)),
await Future<void>.delayed(const Duration(milliseconds: 10)); timeout: const Duration(milliseconds: 50), // Short custom timeout
throw StateError('boom'); preTask: () => SharedPreferences.setMockInitialValues({}),
},
// Ensure plugin mocks exist if defaults touch plugins
preTask: () =>
SharedPreferences.setMockInitialValues({}),
), ),
// Isolate.run returns a RemoteError to the caller isolate throwsA(isA<TimeoutException>()),
throwsA(isA<RemoteError>()), );
await waitFor(() => timedWorker.getAllWorkers().isNotEmpty);
expect(timedWorker.getAllWorkers().first.status, WorkerStatus.timedOut);
});
test('failure marks job as failed and surfaces exception', () async {
// Create a variable to capture the actual exception
Object? caughtException;
try {
await worker.spawn<void>('failing_task', () async {
await Future<void>.delayed(const Duration(milliseconds: 10));
throw StateError('intentional failure');
}, preTask: () => SharedPreferences.setMockInitialValues({}));
fail('Expected an exception to be thrown');
} catch (e) {
caughtException = e;
}
// Verify that an exception was thrown (could be RemoteError or the original StateError)
expect(caughtException, isNotNull);
expect(
caughtException is RemoteError || caughtException is StateError,
isTrue,
reason:
'Should throw either RemoteError or StateError, got: ${caughtException.runtimeType}',
);
// Wait for worker to update history
await waitFor(() => worker.getAllWorkers().isNotEmpty);
final allWorkers = worker.getAllWorkers();
expect(allWorkers.first.status, WorkerStatus.failed);
expect(allWorkers.first.name, 'failing_task');
expect(allWorkers.first.error, isNotNull);
});
test('getWorker finds active and completed workers by ID', () async {
final future = worker.spawn<int>('trackable_task', () async {
await Future.delayed(const Duration(milliseconds: 50));
return 123;
}, preTask: () => SharedPreferences.setMockInitialValues({}));
// Find worker while active
await Future<void>.delayed(const Duration(milliseconds: 10));
final activeWorkers = worker.getActiveWorkers();
expect(activeWorkers.length, 1);
final workerId = activeWorkers.first.id;
final activeWorker = worker.getWorker(workerId);
expect(activeWorker, isNotNull);
expect(activeWorker!.status, WorkerStatus.running);
// Wait for completion
await future;
await waitFor(() => worker.getActiveWorkers().isEmpty);
// Find worker in history
final completedWorker = worker.getWorker(workerId);
expect(completedWorker, isNotNull);
expect(completedWorker!.status, WorkerStatus.completed);
});
test('getWorker returns null for non-existent ID', () {
expect(worker.getWorker('non-existent'), isNull);
expect(worker.getWorker('999999'), isNull);
});
test('worker ID generation uses timestamp format', () async {
final futures = <Future>[];
final Set<String> generatedIds = <String>{};
// Spawn multiple workers with sufficient delay to ensure unique timestamps
for (int i = 0; i < 3; i++) {
futures.add(
worker.spawn<void>(
'task_$i',
() async => Future.delayed(const Duration(milliseconds: 10)),
preTask: () => SharedPreferences.setMockInitialValues({}),
),
);
// Ensure sufficient delay for different timestamps
await Future<void>.delayed(const Duration(milliseconds: 1));
}
// Wait for all workers to be registered as active
await waitFor(() => worker.getActiveWorkers().length == 3);
final activeWorkers = worker.getActiveWorkers();
expect(activeWorkers.length, 3);
// Verify each ID follows the timestamp format and is unique
for (final workerInfo in activeWorkers) {
expect(workerInfo.id, startsWith('iso-'));
expect(
generatedIds.contains(workerInfo.id),
isFalse,
reason: 'Worker ID should be unique: ${workerInfo.id}',
);
generatedIds.add(workerInfo.id);
final timestampPart = workerInfo.id.substring(
4,
); // Remove 'iso-' prefix
final timestamp = int.tryParse(timestampPart);
expect(
timestamp,
isNotNull,
reason: 'Timestamp part should be a valid integer: $timestampPart',
);
expect(
timestamp,
greaterThan(0),
reason: 'Timestamp should be positive: $timestamp',
);
// Verify timestamp is reasonable (not too old, not in future)
final now = DateTime.now().millisecondsSinceEpoch;
expect(
timestamp,
lessThanOrEqualTo(now),
reason: 'Timestamp should not be in the future',
);
expect(
timestamp,
greaterThan(now - 10000),
reason: 'Timestamp should be recent (within 10 seconds)',
);
}
// Verify we generated 3 unique IDs
expect(generatedIds.length, 3);
await Future.wait(futures);
});
test('getAllWorkers combines active and history workers', () async {
// Spawn and complete a short task first
await worker.spawn<void>(
'short_task',
() async => Future.delayed(const Duration(milliseconds: 10)),
preTask: () => SharedPreferences.setMockInitialValues({}),
);
// Wait for short task to complete and move to history
await waitFor(() => worker.getAllWorkers().length == 1);
await waitFor(() => worker.getActiveWorkers().isEmpty);
// Verify we have one completed worker in history
expect(worker.getAllWorkers().length, 1);
expect(worker.getAllWorkers().first.status, WorkerStatus.completed);
expect(worker.getAllWorkers().first.name, 'short_task');
// Now spawn a long-running task
final longTask = worker.spawn<void>(
'long_task',
() async => Future.delayed(const Duration(milliseconds: 100)),
preTask: () => SharedPreferences.setMockInitialValues({}),
);
// Wait briefly for long task to be registered as active
await Future<void>.delayed(const Duration(milliseconds: 10));
// Now we should have 2 workers: 1 active (long_task) and 1 in history (short_task)
final allWorkers = worker.getAllWorkers();
expect(allWorkers.length, 2);
final activeCount = allWorkers
.where((w) => w.status == WorkerStatus.running)
.length;
final completedCount = allWorkers
.where((w) => w.status == WorkerStatus.completed)
.length;
expect(activeCount, 1);
expect(completedCount, 1);
// Verify the active worker is the long task
final activeWorkers = worker.getActiveWorkers();
expect(activeWorkers.length, 1);
expect(activeWorkers.first.name, 'long_task');
await longTask;
});
test('purge removes old workers from history', () async {
// Complete a task
await worker.spawn<void>(
'old_task',
() async => Future.delayed(const Duration(milliseconds: 10)),
preTask: () => SharedPreferences.setMockInitialValues({}),
); );
await waitFor(() => worker.getAllWorkers().isNotEmpty); await waitFor(() => worker.getAllWorkers().isNotEmpty);
final all = worker.getAllWorkers(); expect(worker.getAllWorkers().length, 1);
expect(all.first.status, WorkerStatus.failed);
expect(all.first.name, 'fail'); // Purge with zero max age (removes everything)
worker.purge(maxAge: Duration.zero);
await pumpMicro();
expect(worker.getAllWorkers(), isEmpty);
});
test('maxHistory limit is enforced', () async {
final limitedWorker = WorkerImpl(maxHistory: 2);
// Complete 3 tasks
for (int i = 0; i < 3; i++) {
await limitedWorker.spawn<void>(
'task_$i',
() async => Future.delayed(const Duration(milliseconds: 5)),
preTask: () => SharedPreferences.setMockInitialValues({}),
);
await waitFor(() => limitedWorker.getActiveWorkers().isEmpty);
}
// Should only keep the last 2 in history
final allWorkers = limitedWorker.getAllWorkers();
expect(allWorkers.length, 2);
// Should be the most recent tasks (task_1 and task_2)
final names = allWorkers.map((w) => w.name).toSet();
expect(names.contains('task_1'), isTrue);
expect(names.contains('task_2'), isTrue);
expect(names.contains('task_0'), isFalse);
}); });
test( test(
'getWorker while running, then after completion; purge removes old', 'no timeout when defaultTimeout is null and timeout is null',
() async { () async {
final worker = WorkerImpl(); final noTimeoutWorker = WorkerImpl(defaultTimeout: null);
final fut = worker.spawn<void>( final result = await noTimeoutWorker.spawn<int>(
'long', 'no_timeout_task',
() async => Future.delayed(const Duration(milliseconds: 160)), () async {
preTask: () => await Future.delayed(const Duration(milliseconds: 50));
SharedPreferences.setMockInitialValues({}), return 999;
);
await Future<void>.delayed(const Duration(milliseconds: 25));
final active = worker.getActiveWorkers();
expect(active.length, 1);
final id = active.first.id;
expect(worker.getWorker(id)?.status, WorkerStatus.running);
await fut;
await waitFor(
() => worker.getWorker(id)?.status == WorkerStatus.completed,
);
expect(worker.getAllWorkers().length, 1);
worker.purge(maxAge: Duration.zero);
await pumpMicro();
expect(worker.getAllWorkers(), isEmpty);
}, },
// skip: true, preTask: () => SharedPreferences.setMockInitialValues({}),
); );
expect(result, 999);
await waitFor(() => noTimeoutWorker.getAllWorkers().isNotEmpty);
expect(
noTimeoutWorker.getAllWorkers().first.status,
WorkerStatus.completed,
);
},
);
test('active workers are sorted by start time', () async {
final futures = <Future>[];
// Spawn workers with small delays between them
for (int i = 0; i < 3; i++) {
futures.add(
worker.spawn<void>(
'timed_task_$i',
() async => Future.delayed(const Duration(milliseconds: 100)),
preTask: () => SharedPreferences.setMockInitialValues({}),
),
);
await Future<void>.delayed(const Duration(milliseconds: 10));
}
final activeWorkers = worker.getActiveWorkers();
expect(activeWorkers.length, 3);
// Verify they are sorted by start time (earliest first)
for (int i = 1; i < activeWorkers.length; i++) {
expect(
activeWorkers[i - 1].startedAt.isBefore(activeWorkers[i].startedAt) ||
activeWorkers[i - 1].startedAt.isAtSameMomentAs(
activeWorkers[i].startedAt,
),
isTrue,
reason: 'Workers should be sorted by start time',
);
}
await Future.wait(futures);
});
}); });
} }