乐闻世界logo
搜索文章和话题

How to handle errors in RxJS? What are the error handling operators?

2月21日 16:23

Importance of Error Handling

In RxJS, error handling is crucial because any error in an Observable stream will cause the entire stream to terminate. Without proper error handling, it can lead to:

  • Application crashes
  • Data loss
  • Poor user experience
  • Difficult debugging

Common Error Handling Operators

1. catchError

catchError is the most commonly used error handling operator, which catches errors and returns a new Observable.

Basic usage:

javascript
import { of } from 'rxjs'; import { map, catchError } from 'rxjs/operators'; of(1, 2, 3, 4).pipe( map(x => { if (x === 3) throw new Error('Error at 3'); return x; }), catchError(error => { console.error('Caught error:', error.message); return of('default value'); }) ).subscribe(console.log); // Output: 1, 2, 'default value'

Advanced usage - Recoverable error handling:

javascript
import { of, throwError } from 'rxjs'; import { map, catchError, retry } from 'rxjs/operators'; function fetchData(id: number) { return of({ id, data: `Data ${id}` }).pipe( map(response => { if (id === 2) throw new Error('Invalid ID'); return response; }) ); } of(1, 2, 3).pipe( mergeMap(id => fetchData(id).pipe( catchError(error => { console.error(`Error for ID ${id}:`, error.message); return of({ id, data: 'fallback data' }); }) )) ).subscribe(result => console.log(result)); // Output: {id: 1, data: "Data 1"}, {id: 2, data: "fallback data"}, {id: 3, data: "Data 3"}

2. retry

The retry operator resubscribes to the source Observable when an error occurs.

Basic usage:

javascript
import { of, throwError } from 'rxjs'; import { map, retry } from 'rxjs/operators'; let attempts = 0; const source$ = of(1, 2, 3).pipe( map(x => { attempts++; if (attempts < 3) throw new Error('Temporary error'); return x; }), retry(2) // Retry 2 times ); source$.subscribe({ next: value => console.log('Success:', value), error: error => console.error('Failed:', error.message) }); // Output: Success: 1, Success: 2, Success: 3

Retry with delay:

javascript
import { of, throwError, timer } from 'rxjs'; import { map, retryWhen, delayWhen, tap } from 'rxjs/operators'; let attempts = 0; const source$ = of(1).pipe( map(() => { attempts++; if (attempts < 3) throw new Error('Temporary error'); return 'Success'; }), retryWhen(errors => errors.pipe( tap(error => console.log(`Attempt ${attempts} failed`)), delayWhen(() => timer(1000)) // Delay 1 second between retries ) ) ); source$.subscribe(console.log);

3. retryWhen

retryWhen provides more flexible retry control, allowing custom retry logic.

Exponential backoff retry:

javascript
import { of, throwError, timer } from 'rxjs'; import { map, retryWhen, tap, scan, delayWhen } from 'rxjs/operators'; let attempts = 0; const source$ = of(1).pipe( map(() => { attempts++; if (attempts < 3) throw new Error('Temporary error'); return 'Success'; }), retryWhen(errors => errors.pipe( scan((retryCount, error) => { if (retryCount >= 3) throw error; return retryCount + 1; }, 0), tap(retryCount => console.log(`Retry attempt ${retryCount + 1}`)), delayWhen(retryCount => timer(Math.pow(2, retryCount) * 1000)) ) ) ); source$.subscribe(console.log);

4. finalize

finalize executes cleanup operations when the Observable completes or errors.

Basic usage:

javascript
import { of } from 'rxjs'; import { map, finalize } from 'rxjs/operators'; of(1, 2, 3).pipe( map(x => x * 2), finalize(() => { console.log('Cleanup completed'); }) ).subscribe(console.log); // Output: 2, 4, 6, Cleanup completed

Resource cleanup:

javascript
import { interval } from 'rxjs'; import { take, finalize } from 'rxjs/operators'; let connection: any = null; const data$ = interval(1000).pipe( take(5), finalize(() => { console.log('Closing connection...'); if (connection) { connection.close(); connection = null; } }) ); data$.subscribe(value => { if (!connection) { connection = { close: () => console.log('Connection closed') }; } console.log('Received:', value); });

5. onErrorResumeNext

onErrorResumeNext continues executing the next Observable when an error occurs.

Basic usage:

javascript
import { of, onErrorResumeNext } from 'rxjs'; const source1$ = of(1, 2, 3).pipe( map(x => { if (x === 2) throw new Error('Error'); return x; }) ); const source2$ = of(4, 5, 6); onErrorResumeNext(source1$, source2$).subscribe(console.log); // Output: 1, 4, 5, 6

Practical Use Cases

1. HTTP Request Error Handling

javascript
import { HttpClient } from '@angular/common/http'; import { of, throwError } from 'rxjs'; import { catchError, retry } from 'rxjs/operators'; class DataService { constructor(private http: HttpClient) {} fetchData(id: string) { return this.http.get(`/api/data/${id}`).pipe( retry(3), // Retry 3 times catchError(error => { console.error('Failed to fetch data:', error); if (error.status === 404) { return of(null); // Return null instead of error } return throwError(() => new Error('Failed to load data')); }) ); } }

2. Form Validation Error Handling

javascript
import { fromEvent } from 'rxjs'; import { debounceTime, map, catchError } from 'rxjs/operators'; const input$ = fromEvent(document.getElementById('email'), 'input').pipe( debounceTime(300), map(event => event.target.value), map(email => { if (!this.isValidEmail(email)) { throw new Error('Invalid email format'); } return email; }), catchError(error => { console.error('Validation error:', error.message); return of(''); // Return empty string }) ); input$.subscribe(email => { console.log('Valid email:', email); });

3. WebSocket Connection Error Handling

javascript
import { webSocket } from 'rxjs/webSocket'; import { retryWhen, delay, tap } from 'rxjs/operators'; function createWebSocket(url: string) { return webSocket(url).pipe( retryWhen(errors => errors.pipe( tap(error => console.error('WebSocket error:', error)), delay(5000) // Retry after 5 seconds ) ) ); } const socket$ = createWebSocket('ws://localhost:8080'); socket$.subscribe({ next: message => console.log('Received:', message), error: error => console.error('Connection failed:', error), complete: () => console.log('Connection closed') });

4. File Upload Error Handling

javascript
import { from } from 'rxjs'; import { map, catchError, finalize } from 'rxjs/operators'; function uploadFile(file: File) { return from(uploadToServer(file)).pipe( map(response => { if (!response.success) { throw new Error('Upload failed'); } return response; }), catchError(error => { console.error('Upload error:', error); return of({ success: false, error: error.message }); }), finalize(() => { console.log('Upload process completed'); }) ); } uploadFile(file).subscribe(result => { if (result.success) { console.log('File uploaded successfully'); } else { console.error('Upload failed:', result.error); } });

Error Handling Best Practices

1. Layered Error Handling

javascript
import { of } from 'rxjs'; import { map, catchError } from 'rxjs/operators'; // First layer: Operation-level error handling const processed$ = source$.pipe( map(data => processData(data)), catchError(error => { console.error('Processing error:', error); return of(defaultData); }) ); // Second layer: Subscription-level error handling processed$.subscribe({ next: data => console.log('Data:', data), error: error => console.error('Subscription error:', error) });

2. Error Type Classification Handling

javascript
import { of, throwError } from 'rxjs'; import { catchError } from 'rxjs/operators'; function handleApiError(error: any) { if (error.status === 401) { // Unauthorized, redirect to login return throwError(() => new Error('Unauthorized')); } else if (error.status === 404) { // Resource not found, return default value return of(null); } else if (error.status >= 500) { // Server error, retry return throwError(() => error); } else { // Other errors return of(null); } } apiCall().pipe( catchError(handleApiError) ).subscribe();

3. Error Logging

javascript
import { of } from 'rxjs'; import { catchError, tap } from 'rxjs/operators'; function logError(error: any, context: string) { console.error(`[${context}] Error:`, error); // Send to error tracking service errorTrackingService.log(error, context); } apiCall().pipe( tap({ error: error => logError(error, 'API Call') }), catchError(error => { return of(fallbackData); }) ).subscribe();

4. User-Friendly Error Messages

javascript
import { of } from 'rxjs'; import { catchError } from 'rxjs/operators'; function getUserFriendlyMessage(error: any): string { const errorMap = { 'Network Error': 'Network connection failed, please check your network', 'Timeout': 'Request timeout, please try again later', 'Unauthorized': 'Please login first', 'default': 'An error occurred, please try again later' }; return errorMap[error.message] || errorMap['default']; } apiCall().pipe( catchError(error => { const userMessage = getUserFriendlyMessage(error); showNotification(userMessage); return of(null); }) ).subscribe();

Common Error Handling Patterns

1. Retry Pattern

javascript
import { of, throwError } from 'rxjs'; import { retry, delayWhen, tap, timer } from 'rxjs/operators'; function retryWithBackoff(maxRetries: number, delayMs: number) { return (source$) => source$.pipe( retryWhen(errors => errors.pipe( tap(error => console.error('Error:', error)), scan((retryCount, error) => { if (retryCount >= maxRetries) throw error; return retryCount + 1; }, 0), delayWhen(retryCount => timer(Math.pow(2, retryCount) * delayMs)) ) ) ); } apiCall().pipe( retryWithBackoff(3, 1000) ).subscribe();

2. Fallback Pattern

javascript
import { of } from 'rxjs'; import { catchError } from 'rxjs/operators'; function withFallback<T>(fallback: T) { return (source$: Observable<T>) => source$.pipe( catchError(error => { console.warn('Using fallback:', error.message); return of(fallback); }) ); } apiCall().pipe( withFallback(defaultData) ).subscribe();

3. Circuit Breaker Pattern

javascript
import { of, throwError } from 'rxjs'; import { catchError, scan, tap } from 'rxjs/operators'; let failureCount = 0; const threshold = 5; const resetTimeout = 60000; // 1 minute function circuitBreaker<T>(source$: Observable<T>): Observable<T> { return source$.pipe( tap({ error: () => failureCount++, next: () => failureCount = 0 }), catchError(error => { if (failureCount >= threshold) { return throwError(() => new Error('Circuit breaker open')); } return throwError(() => error); }) ); } apiCall().pipe( circuitBreaker ).subscribe();

Summary

Key points of RxJS error handling:

  1. catchError: Catch errors and return new Observable
  2. retry/retryWhen: Implement retry logic
  3. finalize: Execute cleanup operations
  4. onErrorResumeNext: Continue execution when error occurs
  5. Layered handling: Handle different types of errors at different levels
  6. User-friendly: Provide clear error messages
  7. Logging: Record errors for debugging
  8. Retry strategy: Reasonably set retry count and delay

Proper error handling can significantly improve application stability and user experience.

标签:Rxjs