Concept of Custom Operators
Custom operators allow you to create reusable RxJS operators that encapsulate specific business logic or data processing patterns. This helps improve code readability, maintainability, and reusability.
Methods to Create Custom Operators
1. Using Observable.create
The most basic method, directly creating an Observable.
javascriptimport { Observable } from 'rxjs'; function myCustomOperator<T>(source$: Observable<T>): Observable<T> { return new Observable(subscriber => { return source$.subscribe({ next: value => { // Process each value subscriber.next(value); }, error: error => { subscriber.error(error); }, complete: () => { subscriber.complete(); } }); }); } // Usage of(1, 2, 3).pipe( myCustomOperator() ).subscribe(console.log);
2. Using Pipeable Operators
Recommended method, creating pipeable operators.
javascriptimport { Observable, OperatorFunction } from 'rxjs'; function myCustomOperator<T>(): OperatorFunction<T, T> { return (source$: Observable<T>) => new Observable(subscriber => { return source$.subscribe({ next: value => { // Processing logic subscriber.next(value); }, error: error => { subscriber.error(error); }, complete: () => { subscriber.complete(); } }); }); } // Usage of(1, 2, 3).pipe( myCustomOperator() ).subscribe(console.log);
3. Combining Existing Operators
Create new operators by combining existing ones.
javascriptimport { Observable, pipe } from 'rxjs'; import { map, filter, tap } from 'rxjs/operators'; function processAndFilter<T>( predicate: (value: T) => boolean, processor: (value: T) => T ) { return pipe( map(processor), filter(predicate), tap(value => console.log('Processed:', value)) ); } // Usage of(1, 2, 3, 4, 5).pipe( processAndFilter( x => x > 2, x => x * 2 ) ).subscribe(console.log); // Output: 6, 8, 10
Practical Application Examples
1. Logging Operator
javascriptimport { Observable, OperatorFunction } from 'rxjs'; import { tap } from 'rxjs/operators'; function log<T>(prefix: string = ''): OperatorFunction<T, T> { return (source$: Observable<T>) => source$.pipe( tap({ next: value => console.log(`${prefix}Next:`, value), error: error => console.error(`${prefix}Error:`, error), complete: () => console.log(`${prefix}Complete`) }) ); } // Usage of(1, 2, 3).pipe( log('Data: ') ).subscribe(console.log); // Output: // Data: Next: 1 // Data: Next: 2 // Data: Next: 3 // Data: Complete
2. Retry Operator
javascriptimport { Observable, OperatorFunction, throwError, of } from 'rxjs'; import { retryWhen, delay, take, scan, tap } from 'rxjs/operators'; function retryWithBackoff<T>( maxRetries: number = 3, delayMs: number = 1000 ): OperatorFunction<T, T> { return (source$: Observable<T>) => source$.pipe( retryWhen(errors => errors.pipe( scan((retryCount, error) => { if (retryCount >= maxRetries) { throw error; } return retryCount + 1; }, 0), tap(retryCount => console.log(`Retry attempt ${retryCount + 1}/${maxRetries}`) ), delay(delayMs) ) ) ); } // Usage of(1).pipe( map(() => { throw new Error('Network error'); }), retryWithBackoff(3, 1000) ).subscribe({ next: console.log, error: error => console.error('Failed:', error.message) });
3. Caching Operator
javascriptimport { Observable, OperatorFunction } from 'rxjs'; import { shareReplay, tap } from 'rxjs/operators'; function cache<T>( bufferSize: number = 1, windowTime: number = 0 ): OperatorFunction<T, T> { return (source$: Observable<T>) => source$.pipe( tap(() => console.log('Fetching data...')), shareReplay(bufferSize, windowTime) ); } // Usage const data$ = http.get('/api/data').pipe( cache(1, 60000) // Cache 1 value for 60 seconds ); // First call will make request data$.subscribe(data => console.log('First:', data)); // Second call will use cache setTimeout(() => { data$.subscribe(data => console.log('Second:', data)); }, 1000);
4. Debounce Operator
javascriptimport { Observable, OperatorFunction, timer } from 'rxjs'; import { debounceTime, distinctUntilChanged } from 'rxjs/operators'; function smartDebounce<T>( delayMs: number = 300, comparator: (prev: T, curr: T) => boolean = (a, b) => a === b ): OperatorFunction<T, T> { return (source$: Observable<T>) => source$.pipe( debounceTime(delayMs), distinctUntilChanged(comparator) ); } // Usage fromEvent(inputElement, 'input').pipe( map(event => event.target.value), smartDebounce(300) ).subscribe(value => { console.log('Debounced value:', value); });
5. Pagination Operator
javascriptimport { Observable, OperatorFunction, from } from 'rxjs'; import { concatMap, scan, map } from 'rxjs/operators'; function paginate<T>( pageSize: number, fetchPage: (page: number) => Observable<T[]> ): OperatorFunction<void, T> { return (source$: Observable<void>) => source$.pipe( concatMap(() => { let currentPage = 0; let allItems: T[] = []; return new Observable<T>(subscriber => { function loadNextPage() { fetchPage(currentPage).pipe( map(items => { allItems = [...allItems, ...items]; items.forEach(item => subscriber.next(item)); if (items.length < pageSize) { subscriber.complete(); } else { currentPage++; loadNextPage(); } }) ).subscribe(); } loadNextPage(); }); }) ); } // Usage function fetchPage(page: number): Observable<number[]> { return of( Array.from({ length: 10 }, (_, i) => page * 10 + i) ).pipe(delay(500)); } of(undefined).pipe( paginate(10, fetchPage) ).subscribe(item => { console.log('Item:', item); });
6. Error Recovery Operator
javascriptimport { Observable, OperatorFunction, of, throwError } from 'rxjs'; import { catchError } from 'rxjs/operators'; function withFallback<T>( fallbackValue: T, shouldFallback: (error: any) => boolean = () => true ): OperatorFunction<T, T> { return (source$: Observable<T>) => source$.pipe( catchError(error => { if (shouldFallback(error)) { console.warn('Using fallback value'); return of(fallbackValue); } return throwError(() => error); }) ); } // Usage http.get('/api/data').pipe( withFallback([], error => error.status === 404) ).subscribe(data => { console.log('Data:', data); });
Advanced Custom Operators
1. State Management Operator
javascriptimport { Observable, OperatorFunction, BehaviorSubject } from 'rxjs'; import { tap, switchMap } from 'rxjs/operators'; function withState<T, S>( initialState: S, reducer: (state: S, value: T) => S ): OperatorFunction<T, [T, S]> { return (source$: Observable<T>) => { const state$ = new BehaviorSubject<S>(initialState); return source$.pipe( tap(value => { const newState = reducer(state$.value, value); state$.next(newState); }), switchMap(value => state$.pipe( map(state => [value, state] as [T, S]) ) ) ); }; } // Usage of(1, 2, 3, 4, 5).pipe( withState(0, (sum, value) => sum + value) ).subscribe(([value, sum]) => { console.log(`Value: ${value}, Sum: ${sum}`); }); // Output: // Value: 1, Sum: 1 // Value: 2, Sum: 3 // Value: 3, Sum: 6 // Value: 4, Sum: 10 // Value: 5, Sum: 15
2. Performance Monitoring Operator
javascriptimport { Observable, OperatorFunction } from 'rxjs'; import { tap, finalize } from 'rxjs/operators'; function measurePerformance<T>( label: string = 'Operation' ): OperatorFunction<T, T> { return (source$: Observable<T>) => { const startTime = performance.now(); return source$.pipe( tap({ next: () => { const elapsed = performance.now() - startTime; console.log(`${label} - Next: ${elapsed.toFixed(2)}ms`); }, complete: () => { const elapsed = performance.now() - startTime; console.log(`${label} - Complete: ${elapsed.toFixed(2)}ms`); } }), finalize(() => { const elapsed = performance.now() - startTime; console.log(`${label} - Total: ${elapsed.toFixed(2)}ms`); }) ); }; } // Usage http.get('/api/data').pipe( measurePerformance('API Call') ).subscribe(data => { console.log('Data:', data); });
3. Batch Processing Operator
javascriptimport { Observable, OperatorFunction, timer } from 'rxjs'; import { bufferTime, filter, mergeMap } from 'rxjs/operators'; function batch<T>( batchSize: number, batchTimeout: number = 1000 ): OperatorFunction<T, T[]> { return (source$: Observable<T>) => source$.pipe( bufferTime(batchTimeout), filter(batch => batch.length > 0), mergeMap(batch => { // If batch size exceeds threshold, split into smaller batches if (batch.length > batchSize) { const batches: T[][] = []; for (let i = 0; i < batch.length; i += batchSize) { batches.push(batch.slice(i, i + batchSize)); } return from(batches); } return of(batch); }) ); } // Usage interval(100).pipe( take(25), batch(10, 500) ).subscribe(batch => { console.log('Batch:', batch); });
Best Practices
1. Keep Operators Pure
javascript// ✅ Good: Pure operator function double<T extends number>(): OperatorFunction<T, T> { return map(x => x * 2 as T); } // ❌ Bad: Operator with side effects function doubleWithSideEffect<T extends number>(): OperatorFunction<T, T> { return map(x => { console.log('Doubling:', x); // Side effect return x * 2 as T; }); }
2. Provide Reasonable Defaults
javascriptfunction retry<T>( maxRetries: number = 3, delayMs: number = 1000 ): OperatorFunction<T, T> { return (source$: Observable<T>) => source$.pipe( retryWhen(errors => errors.pipe( scan((retryCount, error) => { if (retryCount >= maxRetries) throw error; return retryCount + 1; }, 0), delay(delayMs) ) ) ); }
3. Handle Errors Correctly
javascriptfunction safeMap<T, R>( project: (value: T) => R, errorHandler?: (error: any) => R ): OperatorFunction<T, R> { return (source$: Observable<T>) => source$.pipe( map(value => { try { return project(value); } catch (error) { if (errorHandler) { return errorHandler(error); } throw error; } }) ); }
4. Provide Type Safety
javascriptfunction filterByProperty<T, K extends keyof T>( property: K, value: T[K] ): OperatorFunction<T, T> { return (source$: Observable<T>) => source$.pipe( filter(item => item[property] === value) ); } // Usage interface User { id: number; name: string; role: 'admin' | 'user'; } of<User>( { id: 1, name: 'Alice', role: 'admin' }, { id: 2, name: 'Bob', role: 'user' } ).pipe( filterByProperty('role', 'admin') ).subscribe(user => { console.log('Admin:', user.name); });
Testing Custom Operators
javascriptimport { TestScheduler } from 'rxjs/testing'; describe('Custom Operators', () => { let testScheduler: TestScheduler; beforeEach(() => { testScheduler = new TestScheduler((actual, expected) => { expect(actual).toEqual(expected); }); }); it('should double values', () => { testScheduler.run(({ cold, expectObservable }) => { const source$ = cold('-a-b-c-|'); const expected = '-A-B-C-|'; const result$ = source$.pipe(double()); expectObservable(result$).toBe(expected, { a: 1, b: 2, c: 3, A: 2, B: 4, C: 6 }); }); }); });
Summary
Key points for creating custom RxJS operators:
- Use Pipeable Operators: Recommended to use OperatorFunction type
- Keep Pure: Avoid unnecessary side effects
- Handle Errors Correctly: Provide error handling mechanisms
- Provide Type Safety: Use TypeScript generics
- Reasonable Defaults: Provide reasonable default parameters
- Testability: Ensure operators are easy to test
- Complete Documentation: Provide clear documentation and examples
Custom operators can significantly improve code readability and maintainability, and are an important skill for advanced RxJS development.