乐闻世界logo
搜索文章和话题

How to create custom operators in RxJS?

2月21日 16:23

Concept of Custom Operators

Custom operators allow you to create reusable RxJS operators that encapsulate specific business logic or data processing patterns. This helps improve code readability, maintainability, and reusability.

Methods to Create Custom Operators

1. Using Observable.create

The most basic method, directly creating an Observable.

javascript
import { Observable } from 'rxjs'; function myCustomOperator<T>(source$: Observable<T>): Observable<T> { return new Observable(subscriber => { return source$.subscribe({ next: value => { // Process each value subscriber.next(value); }, error: error => { subscriber.error(error); }, complete: () => { subscriber.complete(); } }); }); } // Usage of(1, 2, 3).pipe( myCustomOperator() ).subscribe(console.log);

2. Using Pipeable Operators

Recommended method, creating pipeable operators.

javascript
import { Observable, OperatorFunction } from 'rxjs'; function myCustomOperator<T>(): OperatorFunction<T, T> { return (source$: Observable<T>) => new Observable(subscriber => { return source$.subscribe({ next: value => { // Processing logic subscriber.next(value); }, error: error => { subscriber.error(error); }, complete: () => { subscriber.complete(); } }); }); } // Usage of(1, 2, 3).pipe( myCustomOperator() ).subscribe(console.log);

3. Combining Existing Operators

Create new operators by combining existing ones.

javascript
import { Observable, pipe } from 'rxjs'; import { map, filter, tap } from 'rxjs/operators'; function processAndFilter<T>( predicate: (value: T) => boolean, processor: (value: T) => T ) { return pipe( map(processor), filter(predicate), tap(value => console.log('Processed:', value)) ); } // Usage of(1, 2, 3, 4, 5).pipe( processAndFilter( x => x > 2, x => x * 2 ) ).subscribe(console.log); // Output: 6, 8, 10

Practical Application Examples

1. Logging Operator

javascript
import { Observable, OperatorFunction } from 'rxjs'; import { tap } from 'rxjs/operators'; function log<T>(prefix: string = ''): OperatorFunction<T, T> { return (source$: Observable<T>) => source$.pipe( tap({ next: value => console.log(`${prefix}Next:`, value), error: error => console.error(`${prefix}Error:`, error), complete: () => console.log(`${prefix}Complete`) }) ); } // Usage of(1, 2, 3).pipe( log('Data: ') ).subscribe(console.log); // Output: // Data: Next: 1 // Data: Next: 2 // Data: Next: 3 // Data: Complete

2. Retry Operator

javascript
import { Observable, OperatorFunction, throwError, of } from 'rxjs'; import { retryWhen, delay, take, scan, tap } from 'rxjs/operators'; function retryWithBackoff<T>( maxRetries: number = 3, delayMs: number = 1000 ): OperatorFunction<T, T> { return (source$: Observable<T>) => source$.pipe( retryWhen(errors => errors.pipe( scan((retryCount, error) => { if (retryCount >= maxRetries) { throw error; } return retryCount + 1; }, 0), tap(retryCount => console.log(`Retry attempt ${retryCount + 1}/${maxRetries}`) ), delay(delayMs) ) ) ); } // Usage of(1).pipe( map(() => { throw new Error('Network error'); }), retryWithBackoff(3, 1000) ).subscribe({ next: console.log, error: error => console.error('Failed:', error.message) });

3. Caching Operator

javascript
import { Observable, OperatorFunction } from 'rxjs'; import { shareReplay, tap } from 'rxjs/operators'; function cache<T>( bufferSize: number = 1, windowTime: number = 0 ): OperatorFunction<T, T> { return (source$: Observable<T>) => source$.pipe( tap(() => console.log('Fetching data...')), shareReplay(bufferSize, windowTime) ); } // Usage const data$ = http.get('/api/data').pipe( cache(1, 60000) // Cache 1 value for 60 seconds ); // First call will make request data$.subscribe(data => console.log('First:', data)); // Second call will use cache setTimeout(() => { data$.subscribe(data => console.log('Second:', data)); }, 1000);

4. Debounce Operator

javascript
import { Observable, OperatorFunction, timer } from 'rxjs'; import { debounceTime, distinctUntilChanged } from 'rxjs/operators'; function smartDebounce<T>( delayMs: number = 300, comparator: (prev: T, curr: T) => boolean = (a, b) => a === b ): OperatorFunction<T, T> { return (source$: Observable<T>) => source$.pipe( debounceTime(delayMs), distinctUntilChanged(comparator) ); } // Usage fromEvent(inputElement, 'input').pipe( map(event => event.target.value), smartDebounce(300) ).subscribe(value => { console.log('Debounced value:', value); });

5. Pagination Operator

javascript
import { Observable, OperatorFunction, from } from 'rxjs'; import { concatMap, scan, map } from 'rxjs/operators'; function paginate<T>( pageSize: number, fetchPage: (page: number) => Observable<T[]> ): OperatorFunction<void, T> { return (source$: Observable<void>) => source$.pipe( concatMap(() => { let currentPage = 0; let allItems: T[] = []; return new Observable<T>(subscriber => { function loadNextPage() { fetchPage(currentPage).pipe( map(items => { allItems = [...allItems, ...items]; items.forEach(item => subscriber.next(item)); if (items.length < pageSize) { subscriber.complete(); } else { currentPage++; loadNextPage(); } }) ).subscribe(); } loadNextPage(); }); }) ); } // Usage function fetchPage(page: number): Observable<number[]> { return of( Array.from({ length: 10 }, (_, i) => page * 10 + i) ).pipe(delay(500)); } of(undefined).pipe( paginate(10, fetchPage) ).subscribe(item => { console.log('Item:', item); });

6. Error Recovery Operator

javascript
import { Observable, OperatorFunction, of, throwError } from 'rxjs'; import { catchError } from 'rxjs/operators'; function withFallback<T>( fallbackValue: T, shouldFallback: (error: any) => boolean = () => true ): OperatorFunction<T, T> { return (source$: Observable<T>) => source$.pipe( catchError(error => { if (shouldFallback(error)) { console.warn('Using fallback value'); return of(fallbackValue); } return throwError(() => error); }) ); } // Usage http.get('/api/data').pipe( withFallback([], error => error.status === 404) ).subscribe(data => { console.log('Data:', data); });

Advanced Custom Operators

1. State Management Operator

javascript
import { Observable, OperatorFunction, BehaviorSubject } from 'rxjs'; import { tap, switchMap } from 'rxjs/operators'; function withState<T, S>( initialState: S, reducer: (state: S, value: T) => S ): OperatorFunction<T, [T, S]> { return (source$: Observable<T>) => { const state$ = new BehaviorSubject<S>(initialState); return source$.pipe( tap(value => { const newState = reducer(state$.value, value); state$.next(newState); }), switchMap(value => state$.pipe( map(state => [value, state] as [T, S]) ) ) ); }; } // Usage of(1, 2, 3, 4, 5).pipe( withState(0, (sum, value) => sum + value) ).subscribe(([value, sum]) => { console.log(`Value: ${value}, Sum: ${sum}`); }); // Output: // Value: 1, Sum: 1 // Value: 2, Sum: 3 // Value: 3, Sum: 6 // Value: 4, Sum: 10 // Value: 5, Sum: 15

2. Performance Monitoring Operator

javascript
import { Observable, OperatorFunction } from 'rxjs'; import { tap, finalize } from 'rxjs/operators'; function measurePerformance<T>( label: string = 'Operation' ): OperatorFunction<T, T> { return (source$: Observable<T>) => { const startTime = performance.now(); return source$.pipe( tap({ next: () => { const elapsed = performance.now() - startTime; console.log(`${label} - Next: ${elapsed.toFixed(2)}ms`); }, complete: () => { const elapsed = performance.now() - startTime; console.log(`${label} - Complete: ${elapsed.toFixed(2)}ms`); } }), finalize(() => { const elapsed = performance.now() - startTime; console.log(`${label} - Total: ${elapsed.toFixed(2)}ms`); }) ); }; } // Usage http.get('/api/data').pipe( measurePerformance('API Call') ).subscribe(data => { console.log('Data:', data); });

3. Batch Processing Operator

javascript
import { Observable, OperatorFunction, timer } from 'rxjs'; import { bufferTime, filter, mergeMap } from 'rxjs/operators'; function batch<T>( batchSize: number, batchTimeout: number = 1000 ): OperatorFunction<T, T[]> { return (source$: Observable<T>) => source$.pipe( bufferTime(batchTimeout), filter(batch => batch.length > 0), mergeMap(batch => { // If batch size exceeds threshold, split into smaller batches if (batch.length > batchSize) { const batches: T[][] = []; for (let i = 0; i < batch.length; i += batchSize) { batches.push(batch.slice(i, i + batchSize)); } return from(batches); } return of(batch); }) ); } // Usage interval(100).pipe( take(25), batch(10, 500) ).subscribe(batch => { console.log('Batch:', batch); });

Best Practices

1. Keep Operators Pure

javascript
// ✅ Good: Pure operator function double<T extends number>(): OperatorFunction<T, T> { return map(x => x * 2 as T); } // ❌ Bad: Operator with side effects function doubleWithSideEffect<T extends number>(): OperatorFunction<T, T> { return map(x => { console.log('Doubling:', x); // Side effect return x * 2 as T; }); }

2. Provide Reasonable Defaults

javascript
function retry<T>( maxRetries: number = 3, delayMs: number = 1000 ): OperatorFunction<T, T> { return (source$: Observable<T>) => source$.pipe( retryWhen(errors => errors.pipe( scan((retryCount, error) => { if (retryCount >= maxRetries) throw error; return retryCount + 1; }, 0), delay(delayMs) ) ) ); }

3. Handle Errors Correctly

javascript
function safeMap<T, R>( project: (value: T) => R, errorHandler?: (error: any) => R ): OperatorFunction<T, R> { return (source$: Observable<T>) => source$.pipe( map(value => { try { return project(value); } catch (error) { if (errorHandler) { return errorHandler(error); } throw error; } }) ); }

4. Provide Type Safety

javascript
function filterByProperty<T, K extends keyof T>( property: K, value: T[K] ): OperatorFunction<T, T> { return (source$: Observable<T>) => source$.pipe( filter(item => item[property] === value) ); } // Usage interface User { id: number; name: string; role: 'admin' | 'user'; } of<User>( { id: 1, name: 'Alice', role: 'admin' }, { id: 2, name: 'Bob', role: 'user' } ).pipe( filterByProperty('role', 'admin') ).subscribe(user => { console.log('Admin:', user.name); });

Testing Custom Operators

javascript
import { TestScheduler } from 'rxjs/testing'; describe('Custom Operators', () => { let testScheduler: TestScheduler; beforeEach(() => { testScheduler = new TestScheduler((actual, expected) => { expect(actual).toEqual(expected); }); }); it('should double values', () => { testScheduler.run(({ cold, expectObservable }) => { const source$ = cold('-a-b-c-|'); const expected = '-A-B-C-|'; const result$ = source$.pipe(double()); expectObservable(result$).toBe(expected, { a: 1, b: 2, c: 3, A: 2, B: 4, C: 6 }); }); }); });

Summary

Key points for creating custom RxJS operators:

  1. Use Pipeable Operators: Recommended to use OperatorFunction type
  2. Keep Pure: Avoid unnecessary side effects
  3. Handle Errors Correctly: Provide error handling mechanisms
  4. Provide Type Safety: Use TypeScript generics
  5. Reasonable Defaults: Provide reasonable default parameters
  6. Testability: Ensure operators are easy to test
  7. Complete Documentation: Provide clear documentation and examples

Custom operators can significantly improve code readability and maintainability, and are an important skill for advanced RxJS development.

标签:Rxjs