自定义操作符的概念
自定义操作符允许你创建可重用的 RxJS 操作符,封装特定的业务逻辑或数据处理模式。这有助于提高代码的可读性、可维护性和复用性。
创建自定义操作符的方法
1. 使用 Observable.create
最基本的方法,直接创建 Observable。
javascriptimport { Observable } from 'rxjs'; function myCustomOperator<T>(source$: Observable<T>): Observable<T> { return new Observable(subscriber => { return source$.subscribe({ next: value => { // 处理每个值 subscriber.next(value); }, error: error => { subscriber.error(error); }, complete: () => { subscriber.complete(); } }); }); } // 使用 of(1, 2, 3).pipe( myCustomOperator() ).subscribe(console.log);
2. 使用 pipeable 操作符
推荐的方法,创建可管道化的操作符。
javascriptimport { Observable, OperatorFunction } from 'rxjs'; function myCustomOperator<T>(): OperatorFunction<T, T> { return (source$: Observable<T>) => new Observable(subscriber => { return source$.subscribe({ next: value => { // 处理逻辑 subscriber.next(value); }, error: error => { subscriber.error(error); }, complete: () => { subscriber.complete(); } }); }); } // 使用 of(1, 2, 3).pipe( myCustomOperator() ).subscribe(console.log);
3. 使用现有操作符组合
通过组合现有操作符来创建新的操作符。
javascriptimport { Observable, pipe } from 'rxjs'; import { map, filter, tap } from 'rxjs/operators'; function processAndFilter<T>( predicate: (value: T) => boolean, processor: (value: T) => T ) { return pipe( map(processor), filter(predicate), tap(value => console.log('Processed:', value)) ); } // 使用 of(1, 2, 3, 4, 5).pipe( processAndFilter( x => x > 2, x => x * 2 ) ).subscribe(console.log); // 输出: 6, 8, 10
实际应用示例
1. 日志操作符
javascriptimport { Observable, OperatorFunction } from 'rxjs'; import { tap } from 'rxjs/operators'; function log<T>(prefix: string = ''): OperatorFunction<T, T> { return (source$: Observable<T>) => source$.pipe( tap({ next: value => console.log(`${prefix}Next:`, value), error: error => console.error(`${prefix}Error:`, error), complete: () => console.log(`${prefix}Complete`) }) ); } // 使用 of(1, 2, 3).pipe( log('Data: ') ).subscribe(console.log); // 输出: // Data: Next: 1 // Data: Next: 2 // Data: Next: 3 // Data: Complete
2. 重试操作符
javascriptimport { Observable, OperatorFunction, throwError, of } from 'rxjs'; import { retryWhen, delay, take, scan, tap } from 'rxjs/operators'; function retryWithBackoff<T>( maxRetries: number = 3, delayMs: number = 1000 ): OperatorFunction<T, T> { return (source$: Observable<T>) => source$.pipe( retryWhen(errors => errors.pipe( scan((retryCount, error) => { if (retryCount >= maxRetries) { throw error; } return retryCount + 1; }, 0), tap(retryCount => console.log(`Retry attempt ${retryCount + 1}/${maxRetries}`) ), delay(delayMs) ) ) ); } // 使用 of(1).pipe( map(() => { throw new Error('Network error'); }), retryWithBackoff(3, 1000) ).subscribe({ next: console.log, error: error => console.error('Failed:', error.message) });
3. 缓存操作符
javascriptimport { Observable, OperatorFunction } from 'rxjs'; import { shareReplay, tap } from 'rxjs/operators'; function cache<T>( bufferSize: number = 1, windowTime: number = 0 ): OperatorFunction<T, T> { return (source$: Observable<T>) => source$.pipe( tap(() => console.log('Fetching data...')), shareReplay(bufferSize, windowTime) ); } // 使用 const data$ = http.get('/api/data').pipe( cache(1, 60000) // 缓存 1 个值,60 秒 ); // 第一次调用会发起请求 data$.subscribe(data => console.log('First:', data)); // 第二次调用会使用缓存 setTimeout(() => { data$.subscribe(data => console.log('Second:', data)); }, 1000);
4. 防抖操作符
javascriptimport { Observable, OperatorFunction, timer } from 'rxjs'; import { debounceTime, distinctUntilChanged } from 'rxjs/operators'; function smartDebounce<T>( delayMs: number = 300, comparator: (prev: T, curr: T) => boolean = (a, b) => a === b ): OperatorFunction<T, T> { return (source$: Observable<T>) => source$.pipe( debounceTime(delayMs), distinctUntilChanged(comparator) ); } // 使用 fromEvent(inputElement, 'input').pipe( map(event => event.target.value), smartDebounce(300) ).subscribe(value => { console.log('Debounced value:', value); });
5. 分页操作符
javascriptimport { Observable, OperatorFunction, from } from 'rxjs'; import { concatMap, scan, map } from 'rxjs/operators'; function paginate<T>( pageSize: number, fetchPage: (page: number) => Observable<T[]> ): OperatorFunction<void, T> { return (source$: Observable<void>) => source$.pipe( concatMap(() => { let currentPage = 0; let allItems: T[] = []; return new Observable<T>(subscriber => { function loadNextPage() { fetchPage(currentPage).pipe( map(items => { allItems = [...allItems, ...items]; items.forEach(item => subscriber.next(item)); if (items.length < pageSize) { subscriber.complete(); } else { currentPage++; loadNextPage(); } }) ).subscribe(); } loadNextPage(); }); }) ); } // 使用 function fetchPage(page: number): Observable<number[]> { return of( Array.from({ length: 10 }, (_, i) => page * 10 + i) ).pipe(delay(500)); } of(undefined).pipe( paginate(10, fetchPage) ).subscribe(item => { console.log('Item:', item); });
6. 错误恢复操作符
javascriptimport { Observable, OperatorFunction, of, throwError } from 'rxjs'; import { catchError } from 'rxjs/operators'; function withFallback<T>( fallbackValue: T, shouldFallback: (error: any) => boolean = () => true ): OperatorFunction<T, T> { return (source$: Observable<T>) => source$.pipe( catchError(error => { if (shouldFallback(error)) { console.warn('Using fallback value'); return of(fallbackValue); } return throwError(() => error); }) ); } // 使用 http.get('/api/data').pipe( withFallback([], error => error.status === 404) ).subscribe(data => { console.log('Data:', data); });
高级自定义操作符
1. 状态管理操作符
javascriptimport { Observable, OperatorFunction, BehaviorSubject } from 'rxjs'; import { tap, switchMap } from 'rxjs/operators'; function withState<T, S>( initialState: S, reducer: (state: S, value: T) => S ): OperatorFunction<T, [T, S]> { return (source$: Observable<T>) => { const state$ = new BehaviorSubject<S>(initialState); return source$.pipe( tap(value => { const newState = reducer(state$.value, value); state$.next(newState); }), switchMap(value => state$.pipe( map(state => [value, state] as [T, S]) ) ) ); }; } // 使用 of(1, 2, 3, 4, 5).pipe( withState(0, (sum, value) => sum + value) ).subscribe(([value, sum]) => { console.log(`Value: ${value}, Sum: ${sum}`); }); // 输出: // Value: 1, Sum: 1 // Value: 2, Sum: 3 // Value: 3, Sum: 6 // Value: 4, Sum: 10 // Value: 5, Sum: 15
2. 性能监控操作符
javascriptimport { Observable, OperatorFunction } from 'rxjs'; import { tap, finalize } from 'rxjs/operators'; function measurePerformance<T>( label: string = 'Operation' ): OperatorFunction<T, T> { return (source$: Observable<T>) => { const startTime = performance.now(); return source$.pipe( tap({ next: () => { const elapsed = performance.now() - startTime; console.log(`${label} - Next: ${elapsed.toFixed(2)}ms`); }, complete: () => { const elapsed = performance.now() - startTime; console.log(`${label} - Complete: ${elapsed.toFixed(2)}ms`); } }), finalize(() => { const elapsed = performance.now() - startTime; console.log(`${label} - Total: ${elapsed.toFixed(2)}ms`); }) ); }; } // 使用 http.get('/api/data').pipe( measurePerformance('API Call') ).subscribe(data => { console.log('Data:', data); });
3. 批量处理操作符
javascriptimport { Observable, OperatorFunction, timer } from 'rxjs'; import { bufferTime, filter, mergeMap } from 'rxjs/operators'; function batch<T>( batchSize: number, batchTimeout: number = 1000 ): OperatorFunction<T, T[]> { return (source$: Observable<T>) => source$.pipe( bufferTime(batchTimeout), filter(batch => batch.length > 0), mergeMap(batch => { // 如果批次大小超过阈值,分割成更小的批次 if (batch.length > batchSize) { const batches: T[][] = []; for (let i = 0; i < batch.length; i += batchSize) { batches.push(batch.slice(i, i + batchSize)); } return from(batches); } return of(batch); }) ); } // 使用 interval(100).pipe( take(25), batch(10, 500) ).subscribe(batch => { console.log('Batch:', batch); });
最佳实践
1. 保持操作符纯粹
javascript// ✅ 好的做法:纯粹的操作符 function double<T extends number>(): OperatorFunction<T, T> { return map(x => x * 2 as T); } // ❌ 不好的做法:有副作用的操作符 function doubleWithSideEffect<T extends number>(): OperatorFunction<T, T> { return map(x => { console.log('Doubling:', x); // 副作用 return x * 2 as T; }); }
2. 提供合理的默认值
javascriptfunction retry<T>( maxRetries: number = 3, delayMs: number = 1000 ): OperatorFunction<T, T> { return (source$: Observable<T>) => source$.pipe( retryWhen(errors => errors.pipe( scan((retryCount, error) => { if (retryCount >= maxRetries) throw error; return retryCount + 1; }, 0), delay(delayMs) ) ) ); }
3. 正确处理错误
javascriptfunction safeMap<T, R>( project: (value: T) => R, errorHandler?: (error: any) => R ): OperatorFunction<T, R> { return (source$: Observable<T>) => source$.pipe( map(value => { try { return project(value); } catch (error) { if (errorHandler) { return errorHandler(error); } throw error; } }) ); }
4. 提供类型安全
javascriptfunction filterByProperty<T, K extends keyof T>( property: K, value: T[K] ): OperatorFunction<T, T> { return (source$: Observable<T>) => source$.pipe( filter(item => item[property] === value) ); } // 使用 interface User { id: number; name: string; role: 'admin' | 'user'; } of<User>( { id: 1, name: 'Alice', role: 'admin' }, { id: 2, name: 'Bob', role: 'user' } ).pipe( filterByProperty('role', 'admin') ).subscribe(user => { console.log('Admin:', user.name); });
测试自定义操作符
javascriptimport { TestScheduler } from 'rxjs/testing'; describe('Custom Operators', () => { let testScheduler: TestScheduler; beforeEach(() => { testScheduler = new TestScheduler((actual, expected) => { expect(actual).toEqual(expected); }); }); it('should double values', () => { testScheduler.run(({ cold, expectObservable }) => { const source$ = cold('-a-b-c-|'); const expected = '-A-B-C-|'; const result$ = source$.pipe(double()); expectObservable(result$).toBe(expected, { a: 1, b: 2, c: 3, A: 2, B: 4, C: 6 }); }); }); });
总结
创建自定义 RxJS 操作符的关键点:
- 使用 pipeable 操作符: 推荐使用 OperatorFunction 类型
- 保持纯粹: 避免不必要的副作用
- 正确处理错误: 提供错误处理机制
- 提供类型安全: 使用 TypeScript 泛型
- 合理默认值: 提供合理的默认参数
- 可测试性: 确保操作符易于测试
- 文档完善: 提供清晰的文档和示例
自定义操作符可以显著提升代码的可读性和可维护性,是 RxJS 高级开发的重要技能。