错误处理的重要性
在 RxJS 中,错误处理至关重要,因为 Observable 流中的任何错误都会导致整个流终止。如果不正确处理错误,可能会导致:
- 应用崩溃
- 数据丢失
- 用户体验下降
- 调试困难
常用错误处理操作符
1. catchError
catchError 是最常用的错误处理操作符,它捕获错误并返回一个新的 Observable。
基本用法:
javascriptimport { of } from 'rxjs'; import { map, catchError } from 'rxjs/operators'; of(1, 2, 3, 4).pipe( map(x => { if (x === 3) throw new Error('Error at 3'); return x; }), catchError(error => { console.error('Caught error:', error.message); return of('default value'); }) ).subscribe(console.log); // 输出: 1, 2, 'default value'
高级用法 - 恢复性错误处理:
javascriptimport { of, throwError } from 'rxjs'; import { map, catchError, retry } from 'rxjs/operators'; function fetchData(id: number) { return of({ id, data: `Data ${id}` }).pipe( map(response => { if (id === 2) throw new Error('Invalid ID'); return response; }) ); } of(1, 2, 3).pipe( mergeMap(id => fetchData(id).pipe( catchError(error => { console.error(`Error for ID ${id}:`, error.message); return of({ id, data: 'fallback data' }); }) )) ).subscribe(result => console.log(result)); // 输出: {id: 1, data: "Data 1"}, {id: 2, data: "fallback data"}, {id: 3, data: "Data 3"}
2. retry
retry 操作符在遇到错误时重新订阅源 Observable。
基本用法:
javascriptimport { of, throwError } from 'rxjs'; import { map, retry } from 'rxjs/operators'; let attempts = 0; const source$ = of(1, 2, 3).pipe( map(x => { attempts++; if (attempts < 3) throw new Error('Temporary error'); return x; }), retry(2) // 重试2次 ); source$.subscribe({ next: value => console.log('Success:', value), error: error => console.error('Failed:', error.message) }); // 输出: Success: 1, Success: 2, Success: 3
带延迟的重试:
javascriptimport { of, throwError, timer } from 'rxjs'; import { map, retryWhen, delayWhen, tap } from 'rxjs/operators'; let attempts = 0; const source$ = of(1).pipe( map(() => { attempts++; if (attempts < 3) throw new Error('Temporary error'); return 'Success'; }), retryWhen(errors => errors.pipe( tap(error => console.log(`Attempt ${attempts} failed`)), delayWhen(() => timer(1000)) // 每次重试延迟1秒 ) ) ); source$.subscribe(console.log);
3. retryWhen
retryWhen 提供更灵活的重试控制,可以自定义重试逻辑。
指数退避重试:
javascriptimport { of, throwError, timer } from 'rxjs'; import { map, retryWhen, tap, scan, delayWhen } from 'rxjs/operators'; let attempts = 0; const source$ = of(1).pipe( map(() => { attempts++; if (attempts < 3) throw new Error('Temporary error'); return 'Success'; }), retryWhen(errors => errors.pipe( scan((retryCount, error) => { if (retryCount >= 3) throw error; return retryCount + 1; }, 0), tap(retryCount => console.log(`Retry attempt ${retryCount + 1}`)), delayWhen(retryCount => timer(Math.pow(2, retryCount) * 1000)) ) ) ); source$.subscribe(console.log);
4. finalize
finalize 在 Observable 完成或出错时执行清理操作。
基本用法:
javascriptimport { of } from 'rxjs'; import { map, finalize } from 'rxjs/operators'; of(1, 2, 3).pipe( map(x => x * 2), finalize(() => { console.log('Cleanup completed'); }) ).subscribe(console.log); // 输出: 2, 4, 6, Cleanup completed
清理资源:
javascriptimport { interval } from 'rxjs'; import { take, finalize } from 'rxjs/operators'; let connection: any = null; const data$ = interval(1000).pipe( take(5), finalize(() => { console.log('Closing connection...'); if (connection) { connection.close(); connection = null; } }) ); data$.subscribe(value => { if (!connection) { connection = { close: () => console.log('Connection closed') }; } console.log('Received:', value); });
5. onErrorResumeNext
onErrorResumeNext 在遇到错误时继续执行下一个 Observable。
基本用法:
javascriptimport { of, onErrorResumeNext } from 'rxjs'; const source1$ = of(1, 2, 3).pipe( map(x => { if (x === 2) throw new Error('Error'); return x; }) ); const source2$ = of(4, 5, 6); onErrorResumeNext(source1$, source2$).subscribe(console.log); // 输出: 1, 4, 5, 6
实际应用场景
1. HTTP 请求错误处理
javascriptimport { HttpClient } from '@angular/common/http'; import { of, throwError } from 'rxjs'; import { catchError, retry } from 'rxjs/operators'; class DataService { constructor(private http: HttpClient) {} fetchData(id: string) { return this.http.get(`/api/data/${id}`).pipe( retry(3), // 重试3次 catchError(error => { console.error('Failed to fetch data:', error); if (error.status === 404) { return of(null); // 返回 null 而不是错误 } return throwError(() => new Error('Failed to load data')); }) ); } }
2. 表单验证错误处理
javascriptimport { fromEvent } from 'rxjs'; import { debounceTime, map, catchError } from 'rxjs/operators'; const input$ = fromEvent(document.getElementById('email'), 'input').pipe( debounceTime(300), map(event => event.target.value), map(email => { if (!this.isValidEmail(email)) { throw new Error('Invalid email format'); } return email; }), catchError(error => { console.error('Validation error:', error.message); return of(''); // 返回空字符串 }) ); input$.subscribe(email => { console.log('Valid email:', email); });
3. WebSocket 连接错误处理
javascriptimport { webSocket } from 'rxjs/webSocket'; import { retryWhen, delay, tap } from 'rxjs/operators'; function createWebSocket(url: string) { return webSocket(url).pipe( retryWhen(errors => errors.pipe( tap(error => console.error('WebSocket error:', error)), delay(5000) // 5秒后重试 ) ) ); } const socket$ = createWebSocket('ws://localhost:8080'); socket$.subscribe({ next: message => console.log('Received:', message), error: error => console.error('Connection failed:', error), complete: () => console.log('Connection closed') });
4. 文件上传错误处理
javascriptimport { from } from 'rxjs'; import { map, catchError, finalize } from 'rxjs/operators'; function uploadFile(file: File) { return from(uploadToServer(file)).pipe( map(response => { if (!response.success) { throw new Error('Upload failed'); } return response; }), catchError(error => { console.error('Upload error:', error); return of({ success: false, error: error.message }); }), finalize(() => { console.log('Upload process completed'); }) ); } uploadFile(file).subscribe(result => { if (result.success) { console.log('File uploaded successfully'); } else { console.error('Upload failed:', result.error); } });
错误处理最佳实践
1. 分层错误处理
javascriptimport { of } from 'rxjs'; import { map, catchError } from 'rxjs/operators'; // 第一层:操作级错误处理 const processed$ = source$.pipe( map(data => processData(data)), catchError(error => { console.error('Processing error:', error); return of(defaultData); }) ); // 第二层:订阅级错误处理 processed$.subscribe({ next: data => console.log('Data:', data), error: error => console.error('Subscription error:', error) });
2. 错误类型分类处理
javascriptimport { of, throwError } from 'rxjs'; import { catchError } from 'rxjs/operators'; function handleApiError(error: any) { if (error.status === 401) { // 未授权,跳转到登录页 return throwError(() => new Error('Unauthorized')); } else if (error.status === 404) { // 资源不存在,返回默认值 return of(null); } else if (error.status >= 500) { // 服务器错误,重试 return throwError(() => error); } else { // 其他错误 return of(null); } } apiCall().pipe( catchError(handleApiError) ).subscribe();
3. 错误日志记录
javascriptimport { of } from 'rxjs'; import { catchError, tap } from 'rxjs/operators'; function logError(error: any, context: string) { console.error(`[${context}] Error:`, error); // 发送到错误跟踪服务 errorTrackingService.log(error, context); } apiCall().pipe( tap({ error: error => logError(error, 'API Call') }), catchError(error => { return of(fallbackData); }) ).subscribe();
4. 用户友好的错误消息
javascriptimport { of } from 'rxjs'; import { catchError } from 'rxjs/operators'; function getUserFriendlyMessage(error: any): string { const errorMap = { 'Network Error': '网络连接失败,请检查您的网络', 'Timeout': '请求超时,请稍后重试', 'Unauthorized': '请先登录', 'default': '发生错误,请稍后重试' }; return errorMap[error.message] || errorMap['default']; } apiCall().pipe( catchError(error => { const userMessage = getUserFriendlyMessage(error); showNotification(userMessage); return of(null); }) ).subscribe();
常见错误处理模式
1. 重试模式
javascriptimport { of, throwError } from 'rxjs'; import { retry, delayWhen, tap, timer } from 'rxjs/operators'; function retryWithBackoff(maxRetries: number, delayMs: number) { return (source$) => source$.pipe( retryWhen(errors => errors.pipe( tap(error => console.error('Error:', error)), scan((retryCount, error) => { if (retryCount >= maxRetries) throw error; return retryCount + 1; }, 0), delayWhen(retryCount => timer(Math.pow(2, retryCount) * delayMs)) ) ) ); } apiCall().pipe( retryWithBackoff(3, 1000) ).subscribe();
2. 降级模式
javascriptimport { of } from 'rxjs'; import { catchError } from 'rxjs/operators'; function withFallback<T>(fallback: T) { return (source$: Observable<T>) => source$.pipe( catchError(error => { console.warn('Using fallback:', error.message); return of(fallback); }) ); } apiCall().pipe( withFallback(defaultData) ).subscribe();
3. 断路器模式
javascriptimport { of, throwError } from 'rxjs'; import { catchError, scan, tap } from 'rxjs/operators'; let failureCount = 0; const threshold = 5; const resetTimeout = 60000; // 1分钟 function circuitBreaker<T>(source$: Observable<T>): Observable<T> { return source$.pipe( tap({ error: () => failureCount++, next: () => failureCount = 0 }), catchError(error => { if (failureCount >= threshold) { return throwError(() => new Error('Circuit breaker open')); } return throwError(() => error); }) ); } apiCall().pipe( circuitBreaker ).subscribe();
总结
RxJS 错误处理的关键点:
- catchError: 捕获错误并返回新的 Observable
- retry/retryWhen: 实现重试逻辑
- finalize: 执行清理操作
- onErrorResumeNext: 遇到错误时继续执行
- 分层处理: 在不同层级处理不同类型的错误
- 用户友好: 提供清晰的错误消息
- 日志记录: 记录错误以便调试
- 重试策略: 合理设置重试次数和延迟
正确处理错误可以显著提升应用的稳定性和用户体验。