RxJS 中如何创建自定义操作符?
自定义操作符的概念自定义操作符允许你创建可重用的 RxJS 操作符,封装特定的业务逻辑或数据处理模式。这有助于提高代码的可读性、可维护性和复用性。创建自定义操作符的方法1. 使用 Observable.create最基本的方法,直接创建 Observable。import { Observable } from 'rxjs';function myCustomOperator<T>(source$: Observable<T>): Observable<T> { return new Observable(subscriber => { return source$.subscribe({ next: value => { // 处理每个值 subscriber.next(value); }, error: error => { subscriber.error(error); }, complete: () => { subscriber.complete(); } }); });}// 使用of(1, 2, 3).pipe( myCustomOperator()).subscribe(console.log);2. 使用 pipeable 操作符推荐的方法,创建可管道化的操作符。import { Observable, OperatorFunction } from 'rxjs';function myCustomOperator<T>(): OperatorFunction<T, T> { return (source$: Observable<T>) => new Observable(subscriber => { return source$.subscribe({ next: value => { // 处理逻辑 subscriber.next(value); }, error: error => { subscriber.error(error); }, complete: () => { subscriber.complete(); } }); });}// 使用of(1, 2, 3).pipe( myCustomOperator()).subscribe(console.log);3. 使用现有操作符组合通过组合现有操作符来创建新的操作符。import { Observable, pipe } from 'rxjs';import { map, filter, tap } from 'rxjs/operators';function processAndFilter<T>( predicate: (value: T) => boolean, processor: (value: T) => T) { return pipe( map(processor), filter(predicate), tap(value => console.log('Processed:', value)) );}// 使用of(1, 2, 3, 4, 5).pipe( processAndFilter( x => x > 2, x => x * 2 )).subscribe(console.log);// 输出: 6, 8, 10实际应用示例1. 日志操作符import { Observable, OperatorFunction } from 'rxjs';import { tap } from 'rxjs/operators';function log<T>(prefix: string = ''): OperatorFunction<T, T> { return (source$: Observable<T>) => source$.pipe( tap({ next: value => console.log(`${prefix}Next:`, value), error: error => console.error(`${prefix}Error:`, error), complete: () => console.log(`${prefix}Complete`) }) );}// 使用of(1, 2, 3).pipe( log('Data: ')).subscribe(console.log);// 输出:// Data: Next: 1// Data: Next: 2// Data: Next: 3// Data: Complete2. 重试操作符import { Observable, OperatorFunction, throwError, of } from 'rxjs';import { retryWhen, delay, take, scan, tap } from 'rxjs/operators';function retryWithBackoff<T>( maxRetries: number = 3, delayMs: number = 1000): OperatorFunction<T, T> { return (source$: Observable<T>) => source$.pipe( retryWhen(errors => errors.pipe( scan((retryCount, error) => { if (retryCount >= maxRetries) { throw error; } return retryCount + 1; }, 0), tap(retryCount => console.log(`Retry attempt ${retryCount + 1}/${maxRetries}`) ), delay(delayMs) ) ) );}// 使用of(1).pipe( map(() => { throw new Error('Network error'); }), retryWithBackoff(3, 1000)).subscribe({ next: console.log, error: error => console.error('Failed:', error.message)});3. 缓存操作符import { Observable, OperatorFunction } from 'rxjs';import { shareReplay, tap } from 'rxjs/operators';function cache<T>( bufferSize: number = 1, windowTime: number = 0): OperatorFunction<T, T> { return (source$: Observable<T>) => source$.pipe( tap(() => console.log('Fetching data...')), shareReplay(bufferSize, windowTime) );}// 使用const data$ = http.get('/api/data').pipe( cache(1, 60000) // 缓存 1 个值,60 秒);// 第一次调用会发起请求data$.subscribe(data => console.log('First:', data));// 第二次调用会使用缓存setTimeout(() => { data$.subscribe(data => console.log('Second:', data));}, 1000);4. 防抖操作符import { Observable, OperatorFunction, timer } from 'rxjs';import { debounceTime, distinctUntilChanged } from 'rxjs/operators';function smartDebounce<T>( delayMs: number = 300, comparator: (prev: T, curr: T) => boolean = (a, b) => a === b): OperatorFunction<T, T> { return (source$: Observable<T>) => source$.pipe( debounceTime(delayMs), distinctUntilChanged(comparator) );}// 使用fromEvent(inputElement, 'input').pipe( map(event => event.target.value), smartDebounce(300)).subscribe(value => { console.log('Debounced value:', value);});5. 分页操作符import { Observable, OperatorFunction, from } from 'rxjs';import { concatMap, scan, map } from 'rxjs/operators';function paginate<T>( pageSize: number, fetchPage: (page: number) => Observable<T[]>): OperatorFunction<void, T> { return (source$: Observable<void>) => source$.pipe( concatMap(() => { let currentPage = 0; let allItems: T[] = []; return new Observable<T>(subscriber => { function loadNextPage() { fetchPage(currentPage).pipe( map(items => { allItems = [...allItems, ...items]; items.forEach(item => subscriber.next(item)); if (items.length < pageSize) { subscriber.complete(); } else { currentPage++; loadNextPage(); } }) ).subscribe(); } loadNextPage(); }); }) );}// 使用function fetchPage(page: number): Observable<number[]> { return of( Array.from({ length: 10 }, (_, i) => page * 10 + i) ).pipe(delay(500));}of(undefined).pipe( paginate(10, fetchPage)).subscribe(item => { console.log('Item:', item);});6. 错误恢复操作符import { Observable, OperatorFunction, of, throwError } from 'rxjs';import { catchError } from 'rxjs/operators';function withFallback<T>( fallbackValue: T, shouldFallback: (error: any) => boolean = () => true): OperatorFunction<T, T> { return (source$: Observable<T>) => source$.pipe( catchError(error => { if (shouldFallback(error)) { console.warn('Using fallback value'); return of(fallbackValue); } return throwError(() => error); }) );}// 使用http.get('/api/data').pipe( withFallback([], error => error.status === 404)).subscribe(data => { console.log('Data:', data);});高级自定义操作符1. 状态管理操作符import { Observable, OperatorFunction, BehaviorSubject } from 'rxjs';import { tap, switchMap } from 'rxjs/operators';function withState<T, S>( initialState: S, reducer: (state: S, value: T) => S): OperatorFunction<T, [T, S]> { return (source$: Observable<T>) => { const state$ = new BehaviorSubject<S>(initialState); return source$.pipe( tap(value => { const newState = reducer(state$.value, value); state$.next(newState); }), switchMap(value => state$.pipe( map(state => [value, state] as [T, S]) ) ) ); };}// 使用of(1, 2, 3, 4, 5).pipe( withState(0, (sum, value) => sum + value)).subscribe(([value, sum]) => { console.log(`Value: ${value}, Sum: ${sum}`);});// 输出:// Value: 1, Sum: 1// Value: 2, Sum: 3// Value: 3, Sum: 6// Value: 4, Sum: 10// Value: 5, Sum: 152. 性能监控操作符import { Observable, OperatorFunction } from 'rxjs';import { tap, finalize } from 'rxjs/operators';function measurePerformance<T>( label: string = 'Operation'): OperatorFunction<T, T> { return (source$: Observable<T>) => { const startTime = performance.now(); return source$.pipe( tap({ next: () => { const elapsed = performance.now() - startTime; console.log(`${label} - Next: ${elapsed.toFixed(2)}ms`); }, complete: () => { const elapsed = performance.now() - startTime; console.log(`${label} - Complete: ${elapsed.toFixed(2)}ms`); } }), finalize(() => { const elapsed = performance.now() - startTime; console.log(`${label} - Total: ${elapsed.toFixed(2)}ms`); }) ); };}// 使用http.get('/api/data').pipe( measurePerformance('API Call')).subscribe(data => { console.log('Data:', data);});3. 批量处理操作符import { Observable, OperatorFunction, timer } from 'rxjs';import { bufferTime, filter, mergeMap } from 'rxjs/operators';function batch<T>( batchSize: number, batchTimeout: number = 1000): OperatorFunction<T, T[]> { return (source$: Observable<T>) => source$.pipe( bufferTime(batchTimeout), filter(batch => batch.length > 0), mergeMap(batch => { // 如果批次大小超过阈值,分割成更小的批次 if (batch.length > batchSize) { const batches: T[][] = []; for (let i = 0; i < batch.length; i += batchSize) { batches.push(batch.slice(i, i + batchSize)); } return from(batches); } return of(batch); }) );}// 使用interval(100).pipe( take(25), batch(10, 500)).subscribe(batch => { console.log('Batch:', batch);});最佳实践1. 保持操作符纯粹// ✅ 好的做法:纯粹的操作符function double<T extends number>(): OperatorFunction<T, T> { return map(x => x * 2 as T);}// ❌ 不好的做法:有副作用的操作符function doubleWithSideEffect<T extends number>(): OperatorFunction<T, T> { return map(x => { console.log('Doubling:', x); // 副作用 return x * 2 as T; });}2. 提供合理的默认值function retry<T>( maxRetries: number = 3, delayMs: number = 1000): OperatorFunction<T, T> { return (source$: Observable<T>) => source$.pipe( retryWhen(errors => errors.pipe( scan((retryCount, error) => { if (retryCount >= maxRetries) throw error; return retryCount + 1; }, 0), delay(delayMs) ) ) );}3. 正确处理错误function safeMap<T, R>( project: (value: T) => R, errorHandler?: (error: any) => R): OperatorFunction<T, R> { return (source$: Observable<T>) => source$.pipe( map(value => { try { return project(value); } catch (error) { if (errorHandler) { return errorHandler(error); } throw error; } }) );}4. 提供类型安全function filterByProperty<T, K extends keyof T>( property: K, value: T[K]): OperatorFunction<T, T> { return (source$: Observable<T>) => source$.pipe( filter(item => item[property] === value) );}// 使用interface User { id: number; name: string; role: 'admin' | 'user';}of<User>( { id: 1, name: 'Alice', role: 'admin' }, { id: 2, name: 'Bob', role: 'user' }).pipe( filterByProperty('role', 'admin')).subscribe(user => { console.log('Admin:', user.name);});测试自定义操作符import { TestScheduler } from 'rxjs/testing';describe('Custom Operators', () => { let testScheduler: TestScheduler; beforeEach(() => { testScheduler = new TestScheduler((actual, expected) => { expect(actual).toEqual(expected); }); }); it('should double values', () => { testScheduler.run(({ cold, expectObservable }) => { const source$ = cold('-a-b-c-|'); const expected = '-A-B-C-|'; const result$ = source$.pipe(double()); expectObservable(result$).toBe(expected, { a: 1, b: 2, c: 3, A: 2, B: 4, C: 6 }); }); });});总结创建自定义 RxJS 操作符的关键点:使用 pipeable 操作符: 推荐使用 OperatorFunction 类型保持纯粹: 避免不必要的副作用正确处理错误: 提供错误处理机制提供类型安全: 使用 TypeScript 泛型合理默认值: 提供合理的默认参数可测试性: 确保操作符易于测试文档完善: 提供清晰的文档和示例自定义操作符可以显著提升代码的可读性和可维护性,是 RxJS 高级开发的重要技能。