美文网首页
taskflow各种场景案例

taskflow各种场景案例

作者: hugoren | 来源:发表于2018-03-30 16:45 被阅读0次

HelloWorld

import logging
import os
import sys

logging.basicConfig(level=logging.ERROR)

top_dir = os.path.abspath(os.path.join(os.path.dirname(__file__),
                                       os.pardir,
                                       os.pardir))
sys.path.insert(0, top_dir)

from taskflow import engines
from taskflow.patterns import linear_flow as lf
from taskflow.patterns import unordered_flow as uf
from taskflow import task


# INTRO: This is the defacto hello world equivalent for taskflow; it shows how
# an overly simplistic workflow can be created that runs using different
# engines using different styles of execution (all can be used to run in
# parallel if a workflow is provided that is parallelizable).

class PrinterTask(task.Task):
    def __init__(self, name, show_name=True, inject=None):
        super(PrinterTask, self).__init__(name, inject=inject)
        self._show_name = show_name

    def execute(self, output):
        if self._show_name:
            print("%s: %s" % (self.name, output))
        else:
            print(output)


# This will be the work that we want done, which for this example is just to
# print 'hello world' (like a song) using different tasks and different
# execution models.
song = lf.Flow("beats")

# Unordered flows when ran can be ran in parallel; and a chorus is everyone
# singing at once of course!
hi_chorus = uf.Flow('hello')
world_chorus = uf.Flow('world')
for (name, hello, world) in [('bob', 'hello', 'world'),
                             ('joe', 'hellooo', 'worllllld'),
                             ('sue', "helloooooo!", 'wooorllld!')]:
    hi_chorus.add(PrinterTask("%s@hello" % name,
                              # This will show up to the execute() method of
                              # the task as the argument named 'output' (which
                              # will allow us to print the character we want).
                              inject={'output': hello}))
    world_chorus.add(PrinterTask("%s@world" % name,
                                 inject={'output': world}))

# The composition starts with the conductor and then runs in sequence with
# the chorus running in parallel, but no matter what the 'hello' chorus must
# always run before the 'world' chorus (otherwise the world will fall apart).
song.add(PrinterTask("conductor@begin",
                     show_name=False, inject={'output': "*ding*"}),
         hi_chorus,
         world_chorus,
         PrinterTask("conductor@end",
                     show_name=False, inject={'output': "*dong*"}))

# Run in parallel using eventlet green threads...
try:
    import eventlet as _eventlet  # noqa
except ImportError:
    # No eventlet currently active, skip running with it...
    pass
else:
    print("-- Running in parallel using eventlet --")
    e = engines.load(song, executor='greenthreaded', engine='parallel',
                     max_workers=1)
    e.run()


# Run in parallel using real threads...
print("-- Running in parallel using threads --")
e = engines.load(song, executor='threaded', engine='parallel',
                 max_workers=1)
e.run()


# Run in parallel using external processes...
print("-- Running in parallel using processes --")
e = engines.load(song, executor='processes', engine='parallel',
                 max_workers=1)
e.run()


# Run serially (aka, if the workflow could have been ran in parallel, it will
# not be when ran in this mode)...
print("-- Running serially --")
e = engines.load(song, engine='serial')
e.run()
print("-- Statistics gathered --")
print(e.statistics)

Passing values from and to tasks

import logging
import os
import sys

logging.basicConfig(level=logging.ERROR)

self_dir = os.path.abspath(os.path.dirname(__file__))
top_dir = os.path.abspath(os.path.join(os.path.dirname(__file__),
                                       os.pardir,
                                       os.pardir))
sys.path.insert(0, top_dir)
sys.path.insert(0, self_dir)

from taskflow import engines
from taskflow.patterns import linear_flow
from taskflow import task

# INTRO: This example shows how a task (in a linear/serial workflow) can
# produce an output that can be then consumed/used by a downstream task.


class TaskA(task.Task):
    default_provides = 'a'

    def execute(self):
        print("Executing '%s'" % (self.name))
        return 'a'


class TaskB(task.Task):
    def execute(self, a):
        print("Executing '%s'" % (self.name))
        print("Got input '%s'" % (a))


print("Constructing...")
wf = linear_flow.Flow("pass-from-to")
wf.add(TaskA('a'), TaskB('b'))

print("Loading...")
e = engines.load(wf)

print("Compiling...")
e.compile()

print("Preparing...")
e.prepare()

print("Running...")
e.run()

print("Done...")
Using listeners
import logging
import os
import sys

logging.basicConfig(level=logging.DEBUG)

top_dir = os.path.abspath(os.path.join(os.path.dirname(__file__),
                                       os.pardir,
                                       os.pardir))
sys.path.insert(0, top_dir)

from taskflow import engines
from taskflow.listeners import logging as logging_listener
from taskflow.patterns import linear_flow as lf
from taskflow import task

# INTRO: This example walks through a miniature workflow which will do a
# simple echo operation; during this execution a listener is associated with
# the engine to receive all notifications about what the flow has performed,
# this example dumps that output to the stdout for viewing (at debug level
# to show all the information which is possible).


class Echo(task.Task):
    def execute(self):
        print(self.name)


# Generate the work to be done (but don't do it yet).
wf = lf.Flow('abc')
wf.add(Echo('a'))
wf.add(Echo('b'))
wf.add(Echo('c'))

# This will associate the listener with the engine (the listener
# will automatically register for notifications with the engine and deregister
# when the context is exited).
e = engines.load(wf)
with logging_listener.DynamicLoggingListener(e):
    e.run()

转自:
https://docs.openstack.org/taskflow/latest/user/examples.html#hello-world

相关文章

  • taskflow各种场景案例

    HelloWorld Passing values from and to tasks Using listene...

  • TaskFlow DAG部分源码阅读

    Taskflow有很多功能,例如动态时构建子图,条件节点,并行运算,静态图。这次学习着重于Taskflow对于静态...

  • 案例编写心得2

    编写案例的时候,你需要考虑到各种各样的情况,考虑到各种场景,而各种场景,就需要对实现的业务逻辑熟悉。 但是会存在一...

  • TaskFlow任务流

    最近学习了然后仿写了一个任务流来处理首次打开App启动问题,下面的介绍也基于启动App的情况下介绍的,写完发现才先...

  • 《蔡康永的说话之道》

    蔡康永在书中列举了八十个案例,场景涉及生活、职场等各种常见画面。每一个案例讲解都是一条说话心得,如“把对方看在眼里...

  • 优云双态运维分享之:业务场景驱动的服务型CMDB

    最近这几年,国内外CMDB失败的案例比比皆是,成功的寥寥可数,有人质疑CMDB is dead?但各种业务场景表明...

  • 卖点证据 更深入利益

    模型 产品规格,生产流程, 数据,案例 ,场景图片 直接说更多利益 图片 案例 帮助想象利益场景 更好的体验 场...

  • 婚庆服务小程序案例

    案例一:郑州本质摄影工作室 案例二:钢舞IDO爱都 案例三:Z场景设计 案例四:闪电婚车 案例五:婚纱摄影说

  • 云迁移的各种场景

    一、物理服务器/其他云服务迁移至阿里云 1.1 IDC的物理服务器、虚拟机,其他云平台的服务器迁移到阿里云 阿里云...

  • React各种场景Ref用法

    前言 仅记录不同场景下ref使用,不断补充 解决方案 父组件class,子组件hooks场景 父组件中 子组件 父...

网友评论

      本文标题:taskflow各种场景案例

      本文链接:https://www.haomeiwen.com/subject/pcmicftx.html