美文网首页
channel使用案例--数据流操控

channel使用案例--数据流操控

作者: golang推广大使 | 来源:发表于2019-03-13 23:35 被阅读0次

这部分将介绍一些使用channel进行数据流操控的案例。
整体来说,一个数据流应用包含许多模块。不同的模块做不同的工作。每个模块有一个或多个worker,这些worker并发的做同样的工作。下面是实践中的模块工作清单:

  • 数据生成、搜集,加载
  • 数据服务或保存
  • 数据计算或分析
  • 数据验证和过滤
  • 数据聚会和切分
  • 数据组合和解耦
  • 数据去重或扩散
    一个worker可能从多个其他模块接受数据,并把数据作为输出发送给其他模块。换句话说,一个模块既可以是数据生产者,也可以是消费者。一个只给其他模块发送数据的模块称为producer-only模块。反之,是consumer-only模块。
    许多模块一起构成一个数据流系统。
    下面将展示一些数据流模块的实现。这些实现被用于说明目的,所以他们很简单,但是不高效。

数据生成,搜集和加载

有各种各样的producer-only模块。一个producer-only模块可以通过下面的方式生成数据:

  • 加载文件,读取数据库,爬web
  • 从软件系统或各种硬件中搜集度量数据
  • 通过生成随机数
  • 等等其他方式。
    在这里,我们使用随机数生成器作为示例。生成器函数返回一个结果但不带参数。
import (
    "crypto/rand"
    "encoding/binary"
)

func RandomGenerator() <-chan uint64 {
    c := make(chan uint64)
    go func() {
        rnds := make([]byte, 8)
        for {
            _, err := rand.Read(rnds)
            if err != nil {
                close(c)
            }
            c <- binary.BigEndian.Uint64(rnds)
        }
    }()
    return c
}

事实上,随机数生成器是一个多返回值的future/promise.
一个数据生产商随时可以关闭输出的channel来结束数据生产。

数据聚合

数据聚合模块工作者将相同数据类型的若干数据流聚合到一个流中。
假设数据类型为int64,以下函数将任意数量的数据流聚合为一个。

func Aggregator(inputs ...<-chan uint64) <-chan uint64 {
    output := make(chan uint64)
    for _, in := range inputs {
        in := in // this line is essential
        go func() {
            for {
                output <- <-in // <=> output <- (<-in)
            }
        }()
    }
    return output
}

更好的实现应该考虑输入流是否已关闭。 (也适用于以下其他模块工作者实现。)

func Aggregator(inputs ...<-chan uint64) <-chan uint64 {
    output := make(chan uint64)
    var wg sync.WaitGroup
    for _, in := range inputs {
        wg.Add(1)
        in := in // this line is essential
        go func() {
            for {
                x, ok := <-in
                if ok {
                    output <- x
                } else {
                    wg.Done()
                }
            }
        }()
    }
    go func() {
        wg.Wait()
        close(output)
    }()
    return output
}

如果聚合数据流的数量非常小(两个或三个),我们可以使用select块来聚合这些数据流。

// Assume the number of input stream is two.
...
    output := make(chan uint64)
    go func() {
        inA, inB := inputs[0], inputs[1]
        for {
            select {
            case v := <- inA: output <- v
            case v := <- inB: output <- v
            }
        }
    }
...

数据分割

数据分割模块工作者与数据聚合模块工作者相反。实施分工很容易,但在实践中,分工不是很有用,很少使用。

func Divisor(input <-chan uint64, outputs ...chan<- uint64) {
    for _, out := range outputs {
        out := out // this line is essential
        go func() {
            for {
                out <- <-input // <=> out <- (<-input)
            }
        }()
    }
}

数据组合

数据组合工作者将来自不同输入数据流的若干数据合并为一个数据。
以下是组合工作者示例,其中来自一个流的两个uint64值和来自另一个流的一个uint64值组成一个新的uint64值。当然,这些流通道元件类型在实践中通常是不同的

func Composor(inA <-chan uint64, inB <-chan uint64) <-chan uint64 {
    output := make(chan uint64)
    go func() {
        for {
            a1, b, a2 := <-inA, <-inB, <-inA
            output <- a1 ^ b & a2
        }
    }()
    return output
}

数据重复/扩散

数据复制(扩散)可视为特殊数据分解。将复制一个数据,并将每个复制数据发送到不同的输出数据流。

func Duplicator(in <-chan uint64) (<-chan uint64, <-chan uint64) {
    outA, outB := make(chan uint64), make(chan uint64)
    go func() {
        for {
            x := <-in
            outA <- x
            outB <- x
        }
    }()
    return outA, outB
}

数据计算/分析

数据计算和分析模块的功能变化很大,每个都非常具体。通常,这种模块的工作者功能将每条输入数据转换成另一条输出数据。
出于简单的演示目的,这里显示了一个工作器示例,它反转每个传输的uint64值的每个位.

func Calculator(input <-chan uint64, output chan uint64) (<-chan uint64) {
    if output == nil {
        output = make(chan uint64)
    }
    go func() {
        for {
            x := <-input
            output <- ^x
        }
    }()
    return output
}

数据验证/过滤

数据验证或过滤模块丢弃流中的一些传输数据。例如,以下工作函数会丢弃所有非素数。

import "math/big"

func Filter(input <-chan uint64, output chan uint64) <-chan uint64 {
    if output == nil {
        output = make(chan uint64)
    }
    go func() {
        bigInt := big.NewInt(0)
        for {
            x := <-input
            bigInt.SetUint64(x)
            if bigInt.ProbablyPrime(1) {
                output <- x
            }
        }
    }()
    return output
}

数据服务/保存

通常,数据服务或保存模块是数据流系统中的最后或最终输出模块。这里只提供一个简单的工作器,它打印从输入流接收的每个数据。

import "fmt"

func Printer(input <-chan uint64) {
    for {
        x, ok := <-input
        if ok {
            fmt.Println(x)
        } else {
            return
        }
    }
}

数据流系统组装

现在,让我们使用上面的模块工作器函数来组装几个数据流系统。组装数据流系统只是为了创建一些不同模块的工作者,并为每个工作者指定输入流。
数据流系统示例1(线性管道):

package main

... // the worker functions declared above.

func main() {
    Printer(
        Filter(
            Calculator(
                RandomGenerator(),
            ),
        ),
    )
}

相关文章

  • channel使用案例--数据流操控

    这部分将介绍一些使用channel进行数据流操控的案例。整体来说,一个数据流应用包含许多模块。不同的模块做不同的工...

  • Channel使用案例分享

    本文将会展示许多channel使用案例。 我希望这篇文字能使你在以下几个方面更加便利: 用channel进行异步和...

  • Go 语言学习笔记-select、锁和条件变量

    select select 的作用:通过 select 可以监听 channel 上的数据流动。 select 的...

  • 进阶-2

    Select select 可见监听 Channel 上的数据流动; select 结构与 switch 的结构类...

  • 2021-Flutter

    1、flutter与原生通信,platform channel原理? 2、future 和 stream数据流 3...

  • go channel的常规用法

    循环获取channel 如果需要停止使用channel,需要手动将channel关闭 关闭后的channel还能获...

  • vue中的组件通讯问题

    一、父组件数据流向自组件 二、子组件数据流向父组件 ---完整案例 三、兄弟组件数据流

  • nio杂记

    buffers只有btyebuffer能被channel使用 线程在channel中中断,会使当前channel关...

  • Go select

    通过select语句可以监听channel上的数据流动 Golang的select语句类似于UNIX的select...

  • 上传图片的两种方式

    使用NSData数据流传图片 使用数据流上传单张图片 使用的是AFHTTPRequestOperationMan...

网友评论

      本文标题:channel使用案例--数据流操控

      本文链接:https://www.haomeiwen.com/subject/mboxmqtx.html