写一个简单的工作流(三)

简介:
上午测试了下并发情况下的表现,测试场景:一个有20个节点,包括选择、顺序、并行路由的流程,所有节点都设置为自动执行,1000个线程并发启动案例,也就是这个流程同时有1000个案例在跑,全部跑完结果差强人意,比单线程慢了接近30倍。仔细调整了算法和加锁粒度,尽管整体性能有所提高,但是多线程和单线程间执行的差距仍然没有多大变化,性能瓶颈还是在核心的调度算法上,还需要分析下。测试程序如下:
package  net.rubyeye.insect.workflow.test;

import  java.util.ArrayList;
import  java.util.List;
import  java.util.concurrent.CyclicBarrier;

import  net.rubyeye.insect.workflow.Place;
import  net.rubyeye.insect.workflow.Token;
import  net.rubyeye.insect.workflow.Transition;
import  net.rubyeye.insect.workflow.WorkFlow;
import  net.rubyeye.insect.workflow.WorkFlowManager;
import  net.rubyeye.insect.workflow.basic.BasicWorkflowManager;
import  net.rubyeye.insect.workflow.comm.TransitionType;
import  net.rubyeye.insect.workflow.config.DefaultConfiguration;
import  junit.framework.TestCase;

public   class  CompositeProcessTest  extends  TestCase {
    
private  WorkFlowManager wm;

    WorkFlow composite;

    
private  CyclicBarrier barrier;

    
private   static   final   int  total  =   1000 ;

    @Override
    
protected   void  setUp()  throws  Exception {
        
this .barrier  =   new  CyclicBarrier(total  +   1 );
        wm 
=   new  BasicWorkflowManager();
        wm.setConfiguration(
new  DefaultConfiguration());

        WorkFlow sequence 
=  wm.getWorkFlow( " sequence " );
        WorkFlow concurrency 
=  wm.getWorkFlow( " concurrency " );
        WorkFlow choose 
=  wm.getWorkFlow( " choose " );

        
//  组合流程
        composite  =   new  WorkFlow();
        composite.setName(
" composite " );
        composite.setId(
100 );

        wm.saveWorkFlow(composite);

        
//  修改开始结束节点的输入输出库所
        sequence.getEnd().setType(TransitionType.NORMAL);
        sequence.getEnd().setOutputs(concurrency.getStart().getInputs());

        concurrency.getEnd().setType(TransitionType.NORMAL);
        concurrency.getEnd().setOutputs(choose.getStart().getInputs());

        composite.setStart(sequence.getStart());
        composite.setEnd(choose.getEnd());
        List
< Transition >  transitions  =   new  ArrayList < Transition > ();
        transitions.addAll(sequence.getTransitions());
        transitions.addAll(concurrency.getTransitions());
        transitions.addAll(choose.getTransitions());
        composite.setTransitions(transitions);
    }

    
public   void  testConcurrencyCompositeProcesss()  throws  Exception {
        
for  ( int  i  =   0 ; i  <  total; i ++ ) {
            
new  FlowThread().start();
        }
        barrier.await();
        
long  start  =  System.currentTimeMillis();
        barrier.await();
        
long  end  =  System.currentTimeMillis();
        System.out.println(
" 创建 "   +  total  +   " 个流程并发运行完毕\n花费时间: "   +  (end  -  start)
                
/   1000.0   +   " " );
        
for  (Transition transition : composite.getTransitions()) {
            System.out.println(transition.getName() 
+   "    "
                    
+  transition.isEnable());
            
for  (Place place : transition.getOutputs()) {
                System.out.println(
" place  "   +  place.getId()  +   "   "
                        
+  place.getTokens().size());
            }
        }
    }

    
public   void  testCompositeProcesss()  throws  Exception {
        
long  start  =  System.currentTimeMillis();
        
for  ( int  i  =   0 ; i  <  total; i ++ ) {
            Token token1 
=  wm.startWorkFlow( " composite " );
            token1.setAttribute(
" name " " dennis " );
            token1.setAttribute(
" num " 21 );
            wm.doAction(token1.getId());
            assertTrue(token1.isFinished());
        }
        
long  end  =  System.currentTimeMillis();
        System.out.println(
" 创建 "   +  total  +   " 个流程运行完毕\n花费时间: "   +  (end  -  start)
                
/   1000.0   +   " " );
    }

    
class  FlowThread  extends  Thread {

        @Override
        
public   void  run() {
            
try  {
                barrier.await();
                
//  wm = new BasicWorkflowManager();
                Token token1  =  wm.startWorkFlow( " composite " );
                token1.setAttribute(
" name " " dennis " );
                token1.setAttribute(
" num " 21 );
                wm.doAction(token1.getId());
                assertTrue(token1.isFinished());
                barrier.await();
            } 
catch  (Exception e) {
                
throw   new  RuntimeException(e);
            }
        }

    }
}
文章转自庄周梦蝶  ,原文发布时间 2007-10-12
目录
相关文章
|
11天前
工作流介绍
工作流介绍
|
10月前
工作流
工作流
121 0
|
11月前
snakerflow工作流实践分享
snakerflow工作流实践分享
195 0
|
Serverless
函数工作流
函数工作流自制脑图
87 0
函数工作流
powerjob配置工作流
powerjob工作流内配置依赖关系及判断节点
780 0
powerjob配置工作流
|
存储 弹性计算 监控
浅析数据工作流Prefect
简述 Prefect 是一种新的工作流管理系统,专为现代基础设施而设计,由开源的 Prefect Core 工作流引擎提供支持。 用户只需将任务组织成流程,Prefect 负责其余的工作,可让您非常容易使用数据工作流并添加重试、日志记录、动态映射、缓存、失败通知等语义。
|
开发工具 git
GitFlow工作流
GitFlow工作流
|
运维 Shell 调度