buffer
用数组收集一个流发出的值,直到另一个流发出值,就把当前已收集到的值发出并释放
const clicks = fromEvent(document, 'click');
const intervalEvents = interval(1000);
const buffered = intervalEvents.pipe(buffer(clicks));
buffered.subscribe(x => console.log(x))
定时器一直在执行,直到click事件触发时,把之前累计的定时器数组发送出去
bufferCount
用数组收集一个流发出的值,直到达到给定值的最大值(
bufferSize)
const intervalEvents = interval(1000)
// 只要收集3个值,就将值推送出去
const buffered = intervalEvents.pipe(bufferCount(3));
buffered.subscribe(x => console.log(x))
//[0,1,2]
//[3,4,5]
//[6,7,8]
//……
如果指定了
bufferCount中的第二个参数startBufferEvery,意味着每次新一轮的收集会缓存前一次收集的startBufferEvery值
const intervalEvents = interval(1000)
// 只要收集3个值,就将值推送出去
const buffered = intervalEvents.pipe(bufferCount(3,1));
buffered.subscribe(x => console.log(x))
//这里的(3,1),表示'startBufferEventy'从上次下标为1的位置开始收集复用
//[0,1,2]
//[2,3,4]
//[4,5,6]
//……
bufferTime
每隔指定的时间发出收集到的值
const intervalEvents = interval(200)
const buffered = intervalEvents.pipe(bufferTime(1000))
buffered.subscribe(x => console.log(x))
//[0,1,2,3]
//[4,5,6,7,8]
//……
bufferToggle
在指定的缓冲时间段内收集所有的值,到时间后关闭该时间段并发出所有的值
const sourceInterval = interval(1000)
const startInterval = interval(5000)
const closingInterval = val => {
console.log(`${val}开始缓冲,3S后关闭!`)
return interval(3000)
}
const bufferToggleInterval = sourceInterval.pipe(
bufferToggle(startInterval, closingInterval)
);
const subscribe = bufferToggleInterval.subscribe(val => console.log('Emitted Buffer', val))
//0开始缓冲,3S后关闭!
//Emitted Buffer: (4) [4, 5, 6, 7]
//1开始缓冲,3S后关闭!
//Emitted Buffer: (4) [9, 10, 11, 12]
每5秒开启一个新的缓冲区以收集发出的值,3秒后发出缓冲的值,并关闭当前缓冲区
bufferWhen
收集值,直到指定的Observable发出值
const oneSecondInterval = interval(1000)
const clicks = fromEvent(document, 'click')
const bufferWhenExample = oneSecondInterval.pipe(bufferWhen(() => clicks))
const subscribe = bufferWhenExample.subscribe(val => console.log('Emitted Buffer:', val))
// oneSecondInterval持续触发,直到bufferWhen中的clicks事件触发,才去收集值
concatMap
将源Observalbe发出的每个值,按顺序映射成一个新的Observalbe
const source = of(10, 100)
const example = source.pipe(concatMap(val => of(val * 2)))
const subscribe = example.subscribe(x => console.log('result * 2 => ', x))
// result*2 => 20
// result*2 => 200
concatMapTo
将源Observable发出的每个值,按顺序映射成一个固定值 (不限于Observable)
const source = of(10, 100)
const example = source.pipe(concatMapTo(of('aa')))
const subscribe = example.subscribe(x => console.log(x))
//打印两次aa
exhasust
在一个Observable完成之前,会忽略所有其它的Observable
const clicks = fromEvent(document, 'click')
const higherOrder = clicks.pipe(
map((ev) => interval(1000).pipe(take(5)))
);
const result = higherOrder.pipe(exhaust())
result.subscribe(x => console.log(x))
//点击打印1 2 3 4 5,在这期间即使再点击也会被忽略
exhasustMap
可以理解为map + exhaust的结合,将上面的例子简化为:
const clicks = fromEvent(document, 'click')
const higherOrder = clicks.pipe(
exhaustMap((ev) => interval(1000).pipe(take(5)))
);
higherOrder.subscribe(x => console.log(x))
mergeMap
把源Observalbe发出的每个值,映射成新的值(不限于Observalbe)
const source = of('Hello')
const myPromise = val => new Promise(resolve => resolve(`${val} World From Promise!`))
const example = source.pipe(mergeMap(val => myPromise(val)))
const subscribe = example.subscribe(val => console.log(val))
//Hello World From Promise!
mergeMapTo
将源Observable发出的每个值,映射成新的值(不限于Observalbe)
const clicks = fromEvent(document, 'click');
const result = clicks.pipe(mergeMapTo(interval(1000)))
result.subscribe(x => console.log(x))
//将每次点击触发的各个流合并在一起,互不冲突
scan
在源Observable上应用累加函数,每次累加后返回一个Observalbe
const click$ = fromEvent(document, 'click').pipe(mapTo(1));
const count$ = click$.pipe(
scan((acc, one) => acc + one, 0)
)
count$.subscribe(x => console.log(x))
//每次点击都+1
mergeScan
类似scan,在源Observable上应用累计器函数,每次累加后返回一个Observable流。重写上面的例子:
const click$ = fromEvent(document, 'click').pipe(mapTo(1));
const count$ = click$.pipe(
mergeScan((acc, one) => of(acc + one), 0)
)
count$.subscribe(x => console.log(x))
reduce
类似scan,只不过会等源Observable完成时将结果发出
const source = of(1, 2, 3, 4)
const example = source.pipe(reduce((acc, val) => acc + val))
const subscribe = example.subscribe(val => console.log('Sum:', val))
//与scan不同,reduce不会返回每次累加的结果,它只返回最终的结果
//Sum:10
pairwise
将发出的值两两组合起来,复用上一个值
const pairs = interval(1000).pipe(pairwise())
pairs.subscribe(x => console.log(x))
//[0,1]
//[1,2]
//……
groupBy
将源Observable按条件分组发出
of(
{ id: 1, name: 'JavaScript' },
{ id: 2, name: 'Parcel' },
{ id: 2, name: 'webpack' },
{ id: 1, name: 'TypeScript' },
{ id: 3, name: 'TSLint' }
).pipe(
groupBy(p => p.id),
mergeMap(group => group.pipe(toArray()))
).subscribe(p => console.log(p))
//[ {id: 1, name: "JavaScript"},{id: 1, name: "TypeScript"}]
//[ {id: 2, name: "Parcel"},{id: 2, name: "webpack"}]
//[ {id: 3, name: "TSLint"}]
pluck
选择属性来发出
const clicks = fromEvent(document, 'click')
const tagName = clicks.pipe(pluck('target', 'tagName'))
tagName.subscribe(x => console.log(x))
//打印当前点击的标签
提取对象属性
const source = of(
{ name: 'lxc', age: 18 },
{ name: 'Jerry', age: 35 }
)
const example = source.pipe(pluck('name'))
const subscribe = example.subscribe(val => console.log(val))
//lxc
//Jerry
switchMap
把源Observable发出的每个值经过处理,映射成新的Observable,但每次都会取消上一次的Observale,不管上次Observalbe是否完成
const clicks = fromEvent(document, 'click')
const result = clicks.pipe(switchMap((ev) => interval(1000)))
result.subscribe(x => console.log(x))
//每次触发click时,会清除上一次的interval,重新计数,因为同一时间只会运行一个流
swtichMapTo
const clicks = fromEvent(document, 'click')
const result = clicks.pipe(switchMapTo(interval(1000)))
result.subscribe(x => console.log(x))










网友评论