Skip to content

Real-time Data

Complete examples of implementing real-time data updates with Fasq. Learn how to handle WebSocket connections, server-sent events, and live data synchronization.

class WebSocketService {
static WebSocketChannel? _channel;
static StreamSubscription? _subscription;
static void connect(String url) {
_channel = WebSocketChannel.connect(Uri.parse(url));
_subscription = _channel!.stream.listen(
(data) {
try {
final message = jsonDecode(data);
_handleMessage(message);
} catch (error) {
print('WebSocket message error: $error');
}
},
onError: (error) {
print('WebSocket error: $error');
_reconnect();
},
onDone: () {
print('WebSocket connection closed');
_reconnect();
},
);
}
static void _handleMessage(Map<String, dynamic> message) {
final type = message['type'] as String;
final data = message['data'];
switch (type) {
case 'user_created':
_handleUserCreated(data);
break;
case 'user_updated':
_handleUserUpdated(data);
break;
case 'user_deleted':
_handleUserDeleted(data);
break;
case 'post_created':
_handlePostCreated(data);
break;
case 'post_updated':
_handlePostUpdated(data);
break;
case 'post_deleted':
_handlePostDeleted(data);
break;
}
}
static void _handleUserCreated(Map<String, dynamic> data) {
final user = User.fromJson(data);
final users = QueryClient().getQueryData<List<User>>('users');
if (users != null) {
QueryClient().setQueryData('users', [...users, user]);
}
}
static void _handleUserUpdated(Map<String, dynamic> data) {
final updatedUser = User.fromJson(data);
final users = QueryClient().getQueryData<List<User>>('users');
if (users != null) {
final updatedUsers = users.map((user) =>
user.id == updatedUser.id ? updatedUser : user
).toList();
QueryClient().setQueryData('users', updatedUsers);
}
// Also update individual user query
QueryClient().setQueryData('user:${updatedUser.id}', updatedUser);
}
static void _handleUserDeleted(Map<String, dynamic> data) {
final userId = data['id'] as String;
final users = QueryClient().getQueryData<List<User>>('users');
if (users != null) {
final updatedUsers = users.where((user) => user.id != userId).toList();
QueryClient().setQueryData('users', updatedUsers);
}
// Remove individual user query
QueryClient().removeQuery('user:$userId');
}
static void _handlePostCreated(Map<String, dynamic> data) {
final post = Post.fromJson(data);
final posts = QueryClient().getQueryData<List<Post>>('posts');
if (posts != null) {
QueryClient().setQueryData('posts', [...posts, post]);
}
}
static void _handlePostUpdated(Map<String, dynamic> data) {
final updatedPost = Post.fromJson(data);
final posts = QueryClient().getQueryData<List<Post>>('posts');
if (posts != null) {
final updatedPosts = posts.map((post) =>
post.id == updatedPost.id ? updatedPost : post
).toList();
QueryClient().setQueryData('posts', updatedPosts);
}
// Also update individual post query
QueryClient().setQueryData('post:${updatedPost.id}', updatedPost);
}
static void _handlePostDeleted(Map<String, dynamic> data) {
final postId = data['id'] as String;
final posts = QueryClient().getQueryData<List<Post>>('posts');
if (posts != null) {
final updatedPosts = posts.where((post) => post.id != postId).toList();
QueryClient().setQueryData('posts', updatedPosts);
}
// Remove individual post query
QueryClient().removeQuery('post:$postId');
}
static void _reconnect() {
Future.delayed(Duration(seconds: 5), () {
connect('ws://localhost:8080/ws');
});
}
static void disconnect() {
_subscription?.cancel();
_channel?.sink.close();
}
}
class RealTimeUsersScreen extends StatefulWidget {
@override
State<RealTimeUsersScreen> createState() => _RealTimeUsersScreenState();
}
class _RealTimeUsersScreenState extends State<RealTimeUsersScreen> {
@override
void initState() {
super.initState();
WebSocketService.connect('ws://localhost:8080/ws');
}
@override
void dispose() {
WebSocketService.disconnect();
super.dispose();
}
@override
Widget build(BuildContext context) {
return Scaffold(
appBar: AppBar(
title: Text('Real-time Users'),
actions: [
IconButton(
icon: Icon(Icons.refresh),
onPressed: () {
QueryClient().invalidateQuery('users');
},
),
],
),
body: QueryBuilder<List<User>>(
queryKey: 'users',
queryFn: () => api.fetchUsers(),
builder: (context, state) {
return state.when(
loading: () => Center(child: CircularProgressIndicator()),
error: (error, stack) => Center(
child: Column(
mainAxisAlignment: MainAxisAlignment.center,
children: [
Text('Error: $error'),
ElevatedButton(
onPressed: () {
QueryClient().invalidateQuery('users');
},
child: Text('Retry'),
),
],
),
),
data: (users) => ListView.builder(
itemCount: users.length,
itemBuilder: (context, index) {
final user = users[index];
return RealTimeUserTile(user: user);
},
),
);
},
),
floatingActionButton: FloatingActionButton(
onPressed: () {
Navigator.push(
context,
MaterialPageRoute(
builder: (context) => CreateUserScreen(),
),
);
},
child: Icon(Icons.add),
),
);
}
}
class RealTimeUserTile extends StatelessWidget {
final User user;
const RealTimeUserTile({required this.user});
@override
Widget build(BuildContext context) {
return Card(
child: ListTile(
title: Text(user.name),
subtitle: Text(user.email),
trailing: Row(
mainAxisSize: MainAxisSize.min,
children: [
// Show real-time indicator
Container(
width: 8,
height: 8,
decoration: BoxDecoration(
color: Colors.green,
shape: BoxShape.circle,
),
),
SizedBox(width: 8),
PopupMenuButton(
itemBuilder: (context) => [
PopupMenuItem(
value: 'edit',
child: Row(
children: [
Icon(Icons.edit),
SizedBox(width: 8),
Text('Edit'),
],
),
),
PopupMenuItem(
value: 'delete',
child: Row(
children: [
Icon(Icons.delete, color: Colors.red),
SizedBox(width: 8),
Text('Delete', style: TextStyle(color: Colors.red)),
],
),
),
],
onSelected: (value) {
switch (value) {
case 'edit':
Navigator.push(
context,
MaterialPageRoute(
builder: (context) => EditUserScreen(user: user),
),
);
break;
case 'delete':
_showDeleteDialog(context);
break;
}
},
),
],
),
),
);
}
void _showDeleteDialog(BuildContext context) {
showDialog(
context: context,
builder: (context) => AlertDialog(
title: Text('Delete User'),
content: Text('Are you sure you want to delete ${user.name}?'),
actions: [
TextButton(
onPressed: () => Navigator.pop(context),
child: Text('Cancel'),
),
TextButton(
onPressed: () {
Navigator.pop(context);
_deleteUser(context);
},
child: Text('Delete', style: TextStyle(color: Colors.red)),
),
],
),
);
}
void _deleteUser(BuildContext context) {
MutationBuilder<void, String>(
mutationFn: (id) => api.deleteUser(id),
options: MutationOptions(
onSuccess: (_, id) {
// WebSocket will handle the real-time update
ScaffoldMessenger.of(context).showSnackBar(
SnackBar(content: Text('User deleted successfully')),
);
},
onError: (error) {
ScaffoldMessenger.of(context).showSnackBar(
SnackBar(content: Text('Failed to delete user: $error')),
);
},
),
builder: (context, state) {
return state.mutate(user.id);
},
);
}
}
class SSEService {
static EventSource? _eventSource;
static void connect(String url) {
_eventSource = EventSource(url);
_eventSource!.onMessage = (event) {
try {
final data = jsonDecode(event.data);
_handleSSEMessage(data);
} catch (error) {
print('SSE message error: $error');
}
};
_eventSource!.onError = (error) {
print('SSE error: $error');
_reconnect();
};
}
static void _handleSSEMessage(Map<String, dynamic> data) {
final type = data['type'] as String;
final payload = data['payload'];
switch (type) {
case 'user_created':
_handleUserCreated(payload);
break;
case 'user_updated':
_handleUserUpdated(payload);
break;
case 'user_deleted':
_handleUserDeleted(payload);
break;
}
}
static void _handleUserCreated(Map<String, dynamic> data) {
final user = User.fromJson(data);
final users = QueryClient().getQueryData<List<User>>('users');
if (users != null) {
QueryClient().setQueryData('users', [...users, user]);
}
}
static void _handleUserUpdated(Map<String, dynamic> data) {
final updatedUser = User.fromJson(data);
final users = QueryClient().getQueryData<List<User>>('users');
if (users != null) {
final updatedUsers = users.map((user) =>
user.id == updatedUser.id ? updatedUser : user
).toList();
QueryClient().setQueryData('users', updatedUsers);
}
}
static void _handleUserDeleted(Map<String, dynamic> data) {
final userId = data['id'] as String;
final users = QueryClient().getQueryData<List<User>>('users');
if (users != null) {
final updatedUsers = users.where((user) => user.id != userId).toList();
QueryClient().setQueryData('users', updatedUsers);
}
}
static void _reconnect() {
Future.delayed(Duration(seconds: 5), () {
connect('http://localhost:8080/events');
});
}
static void disconnect() {
_eventSource?.close();
}
}
class SSERealTimeScreen extends StatefulWidget {
@override
State<SSERealTimeScreen> createState() => _SSERealTimeScreenState();
}
class _SSERealTimeScreenState extends State<SSERealTimeScreen> {
@override
void initState() {
super.initState();
SSEService.connect('http://localhost:8080/events');
}
@override
void dispose() {
SSEService.disconnect();
super.dispose();
}
@override
Widget build(BuildContext context) {
return Scaffold(
appBar: AppBar(title: Text('SSE Real-time Users')),
body: QueryBuilder<List<User>>(
queryKey: 'users',
queryFn: () => api.fetchUsers(),
builder: (context, state) {
return state.when(
loading: () => Center(child: CircularProgressIndicator()),
error: (error, stack) => Center(child: Text('Error: $error')),
data: (users) => ListView.builder(
itemCount: users.length,
itemBuilder: (context, index) {
final user = users[index];
return ListTile(
title: Text(user.name),
subtitle: Text(user.email),
trailing: Container(
width: 8,
height: 8,
decoration: BoxDecoration(
color: Colors.blue,
shape: BoxShape.circle,
),
),
);
},
),
);
},
),
);
}
}
class PollingService {
static Timer? _timer;
static bool _isPolling = false;
static void startPolling({
required String queryKey,
required Future<dynamic> Function() queryFn,
Duration interval = const Duration(seconds: 30),
}) {
if (_isPolling) return;
_isPolling = true;
_timer = Timer.periodic(interval, (timer) {
_pollQuery(queryKey, queryFn);
});
}
static void stopPolling() {
_timer?.cancel();
_timer = null;
_isPolling = false;
}
static Future<void> _pollQuery(String queryKey, Future<dynamic> Function() queryFn) async {
try {
final data = await queryFn();
QueryClient().setQueryData(queryKey, data);
} catch (error) {
print('Polling error: $error');
}
}
}
class PollingUsersScreen extends StatefulWidget {
@override
State<PollingUsersScreen> createState() => _PollingUsersScreenState();
}
class _PollingUsersScreenState extends State<PollingUsersScreen> {
@override
void initState() {
super.initState();
PollingService.startPolling(
queryKey: 'users',
queryFn: () => api.fetchUsers(),
interval: Duration(seconds: 30),
);
}
@override
void dispose() {
PollingService.stopPolling();
super.dispose();
}
@override
Widget build(BuildContext context) {
return Scaffold(
appBar: AppBar(
title: Text('Polling Users'),
actions: [
IconButton(
icon: Icon(Icons.refresh),
onPressed: () {
QueryClient().invalidateQuery('users');
},
),
],
),
body: QueryBuilder<List<User>>(
queryKey: 'users',
queryFn: () => api.fetchUsers(),
builder: (context, state) {
return Column(
children: [
// Show polling indicator
Container(
padding: EdgeInsets.all(8),
color: Colors.blue.shade50,
child: Row(
children: [
Icon(Icons.sync, size: 16),
SizedBox(width: 8),
Text('Polling every 30 seconds'),
],
),
),
Expanded(
child: state.when(
loading: () => Center(child: CircularProgressIndicator()),
error: (error, stack) => Center(child: Text('Error: $error')),
data: (users) => ListView.builder(
itemCount: users.length,
itemBuilder: (context, index) {
final user = users[index];
return ListTile(
title: Text(user.name),
subtitle: Text(user.email),
);
},
),
),
),
],
);
},
),
);
}
}
class NotificationService {
static void showNotification(String title, String body) {
// Show local notification
FlutterLocalNotificationsPlugin().show(
0,
title,
body,
NotificationDetails(
android: AndroidNotificationDetails(
'real_time_channel',
'Real-time Updates',
importance: Importance.high,
),
),
);
}
static void handleRealTimeUpdate(String type, Map<String, dynamic> data) {
switch (type) {
case 'user_created':
showNotification('New User', 'A new user has been created');
break;
case 'user_updated':
showNotification('User Updated', 'A user has been updated');
break;
case 'user_deleted':
showNotification('User Deleted', 'A user has been deleted');
break;
}
}
}
class RealTimeNotificationScreen extends StatefulWidget {
@override
State<RealTimeNotificationScreen> createState() => _RealTimeNotificationScreenState();
}
class _RealTimeNotificationScreenState extends State<RealTimeNotificationScreen> {
@override
void initState() {
super.initState();
_setupNotifications();
}
void _setupNotifications() {
// Initialize local notifications
FlutterLocalNotificationsPlugin().initialize(
InitializationSettings(
android: AndroidInitializationSettings('@mipmap/ic_launcher'),
),
);
// Connect to WebSocket for real-time updates
WebSocketService.connect('ws://localhost:8080/ws');
}
@override
void dispose() {
WebSocketService.disconnect();
super.dispose();
}
@override
Widget build(BuildContext context) {
return Scaffold(
appBar: AppBar(title: Text('Real-time Notifications')),
body: QueryBuilder<List<User>>(
queryKey: 'users',
queryFn: () => api.fetchUsers(),
builder: (context, state) {
return state.when(
loading: () => Center(child: CircularProgressIndicator()),
error: (error, stack) => Center(child: Text('Error: $error')),
data: (users) => ListView.builder(
itemCount: users.length,
itemBuilder: (context, index) {
final user = users[index];
return ListTile(
title: Text(user.name),
subtitle: Text(user.email),
trailing: Icon(Icons.notifications_active),
);
},
),
);
},
),
);
}
}
class CollaborativeEditorScreen extends StatefulWidget {
final String documentId;
const CollaborativeEditorScreen({required this.documentId});
@override
State<CollaborativeEditorScreen> createState() => _CollaborativeEditorScreenState();
}
class _CollaborativeEditorScreenState extends State<CollaborativeEditorScreen> {
final _textController = TextEditingController();
List<User> _collaborators = [];
@override
void initState() {
super.initState();
_setupCollaboration();
}
void _setupCollaboration() {
// Connect to WebSocket for collaborative editing
WebSocketService.connect('ws://localhost:8080/collaborate');
// Listen for document updates
WebSocketService._channel?.stream.listen((data) {
final message = jsonDecode(data);
if (message['type'] == 'document_updated') {
_handleDocumentUpdate(message['data']);
} else if (message['type'] == 'collaborator_joined') {
_handleCollaboratorJoined(message['data']);
} else if (message['type'] == 'collaborator_left') {
_handleCollaboratorLeft(message['data']);
}
});
}
void _handleDocumentUpdate(Map<String, dynamic> data) {
if (data['documentId'] == widget.documentId) {
setState(() {
_textController.text = data['content'];
});
}
}
void _handleCollaboratorJoined(Map<String, dynamic> data) {
setState(() {
_collaborators.add(User.fromJson(data));
});
}
void _handleCollaboratorLeft(Map<String, dynamic> data) {
setState(() {
_collaborators.removeWhere((user) => user.id == data['userId']);
});
}
void _sendDocumentUpdate() {
WebSocketService._channel?.sink.add(jsonEncode({
'type': 'document_update',
'data': {
'documentId': widget.documentId,
'content': _textController.text,
},
}));
}
@override
Widget build(BuildContext context) {
return Scaffold(
appBar: AppBar(
title: Text('Collaborative Editor'),
actions: [
// Show collaborators
IconButton(
icon: Icon(Icons.people),
onPressed: () {
_showCollaboratorsDialog();
},
),
],
),
body: Column(
children: [
// Show active collaborators
Container(
padding: EdgeInsets.all(8),
color: Colors.blue.shade50,
child: Row(
children: [
Text('Collaborators: '),
..._collaborators.map((user) =>
Container(
margin: EdgeInsets.only(right: 8),
padding: EdgeInsets.symmetric(horizontal: 8, vertical: 4),
decoration: BoxDecoration(
color: Colors.blue,
borderRadius: BorderRadius.circular(12),
),
child: Text(
user.name,
style: TextStyle(color: Colors.white, fontSize: 12),
),
),
),
],
),
),
Expanded(
child: TextField(
controller: _textController,
maxLines: null,
decoration: InputDecoration(
hintText: 'Start typing...',
border: InputBorder.none,
contentPadding: EdgeInsets.all(16),
),
onChanged: (value) {
_sendDocumentUpdate();
},
),
),
],
),
);
}
void _showCollaboratorsDialog() {
showDialog(
context: context,
builder: (context) => AlertDialog(
title: Text('Active Collaborators'),
content: Column(
mainAxisSize: MainAxisSize.min,
children: _collaborators.map((user) =>
ListTile(
title: Text(user.name),
subtitle: Text(user.email),
),
).toList(),
),
actions: [
TextButton(
onPressed: () => Navigator.pop(context),
child: Text('Close'),
),
],
),
);
}
}
  1. Handle connection errors - Implement reconnection logic
  2. Optimize updates - Only update changed data
  3. Use appropriate transport - Choose WebSocket, SSE, or polling based on needs
  4. Implement conflict resolution - Handle concurrent updates
  5. Show connection status - Indicate real-time connection state
  6. Handle offline scenarios - Gracefully handle network issues
  7. Optimize performance - Use efficient data structures and algorithms