1、需求:
现有五个任务:A、B、(C、D)、E任务,其中C和D是需要并发运行的任务2、实现:
1)、看到并发首先想到fork...join,流程图如下:<?xml version="1.0" encoding="UTF-8"?><process name="test_demo" key="test" xmlns="http://jbpm.org/4.4/jpdl">
<start g="92,7,48,48" name="start1">
<transition name="to A" to="A" g="-29,-17"/>
</start>
<task name="A" g="73,73,92,52">
<assignment-handler class="test.TestUtil">
</assignment-handler>
<transition name="to B" to="B" g="-29,-17"/>
</task>
<task name="B" g="73,150,92,52">
<assignment-handler class="test.TestUtil">
</assignment-handler>
<transition name="to fork" to="fork" g="-47,-17"/>
</task>
<fork name="fork" g="96,223,48,48">
<transition name="to C" to="C" g="-29,-17"/>
<transition name="to D" to="D" g="-29,-17"/>
</fork>
<task name="C" g="15,291,92,52">
<assignment-handler class="test.TestUtil">
<field name="isFork">
<string value="true"/>
</field>
</assignment-handler>
<transition name="to join" to="join" g="-47,-17"/>
</task>
<task name="D" g="143,289,92,52" >
<assignment-handler class="test.TestUtil">
<field name="isFork">
<string value="true"/>
</field>
</assignment-handler>
<transition name="to join" to="join" g="-47,-17"/>
</task>
<join name="join" g="104,360,48,48" multiplicity="2">
<transition name="to E" to="E" g="-29,-17"/>
</join>
<task name="E" g="80,434,92,52">
<assignment-handler class="test.TestUtil">
</assignment-handler>
<transition name="to end" to="wait" g="-29,-17"/>
</task>
<end name="end" g="105,603,48,48"/>
</process>2)、TestUtil,只有当是并行节点才加入线程中,代码如下:public class TestUtil implements AssignmentHandler {
private String isFork;
public String getIsFork() {
return isFork;
}
public void setIsFork(String isFork) {
this.isFork = isFork;
}
public void assign(Assignable asgnbl, OpenExecution oe) throws Exception {
oe.setVariable("result", "false");//设置到数据库成功
if ("true".equals(isFork){
new Thread(new ThreadTest(oe)).start();
}
}
}3)、因为fork...join标签里面的子流程是虽然是并行的,不过是串行触发而不是并发,所以我考虑把任务C和D放到线程里执行。然后根据存在数据库里的result进行线程等待处理,即线程C和D分别执行完把result设置为true,主线程才继续往下走public class ThreadTest implements Runnable { private OpenExecution oe;
public ThreadTest(OpenExecution oe) {
this.oe = oe;
}
public void run() {
//synchronized (this) {
try {
oe.setVariable("fuck", "xxxxxxxxx");//这里报错
System.out.println("result |||||" + oe.getVariable(oe.getActivity().getName()) + "####" + oe.getVariable("fuck") + "****" + oe.getActivity().getName() + "&&&&" + oe.getActivity().getType());
} catch (Exception ex) {
oe.setVariable("error", "cnm");
ex.printStackTrace();
}
// }
}
public OpenExecution getOe() {
return oe;
}
public void setOe(OpenExecution oe) {
this.oe = oe;
}
}4)、实现类Test: public ProcessEngine processEngine;
public RepositoryService repositoryService;
public ExecutionService executionService;
public TaskService taskService;
public ProcessInstance processInstance; public Test() {
processEngine = Configuration.getProcessEngine();
executionService = processEngine.getExecutionService();
taskService = processEngine.getTaskService();
} /**
* 流程发布
* @param key
* @return
*/
public String deploy(String path, String key) {
String deploymentId = processEngine.getRepositoryService().createDeployment().addResourceFromClasspath(path).deploy();
System.out.println("========流程发布:" + deploymentId);
return deploymentId;
} /**
* 根据预留的KEY启动流程
* @return
*/
public String initStart(String key) {
processInstance = executionService.startProcessInstanceByKey(key, key);
System.out.println("========流程开始:" + processInstance.getId());
return processInstance.getId();
} /**
* 流程执行
* @param key : 流程定义的KEY值
* @param activityName : 传入的流程的节点名称
*/
public void execute(String key) {
String processInstanceId = initStart(key);
while (true) {
List<Task> tasklist2 = taskService.findPersonalTasks(null);
for (Task task : tasklist2) {
if (task.getExecutionId().indexOf("test.test") == 0) {
taskService.completeTask(task.getId());
}
}
try {
Thread.sleep(500);
} catch (InterruptedException ex) {
}
}
}
public static void main(String[] args) {
Test j = new Test();
j.deploy("test/test.jpdl.xml", "test");
j.execute("test");
}
最后:报错信息:
Exception in thread "main" org.hibernate.HibernateException: instance not of expected entity type: org.jbpm.pvm.internal.type.variable.UnpersistableVariable is not a: org.jbpm.pvm.internal.type.Variable
at org.hibernate.persister.entity.AbstractEntityPersister.getSubclassEntityPersister(AbstractEntityPersister.java:3663)
at org.hibernate.impl.SessionImpl.getEntityPersister(SessionImpl.java:1374)
at org.hibernate.engine.ForeignKeys.isTransient(ForeignKeys.java:203)
at org.hibernate.event.def.AbstractSaveEventListener.getEntityState(AbstractSaveEventListener.java:535)
at org.hibernate.event.def.DefaultSaveOrUpdateEventListener.performSaveOrUpdate(DefaultSaveOrUpdateEventListener.java:103)
at org.hibernate.event.def.DefaultSaveOrUpdateEventListener.onSaveOrUpdate(DefaultSaveOrUpdateEventListener.java:93)
at org.hibernate.impl.SessionImpl.fireSaveOrUpdate(SessionImpl.java:534)
at org.hibernate.impl.SessionImpl.saveOrUpdate(SessionImpl.java:526)
at org.hibernate.engine.CascadingAction$5.cascade(CascadingAction.java:241)
at org.hibernate.engine.Cascade.cascadeToOne(Cascade.java:291)
at org.hibernate.engine.Cascade.cascadeAssociation(Cascade.java:239)
at org.hibernate.engine.Cascade.cascadeProperty(Cascade.java:192)
at org.hibernate.engine.Cascade.cascadeCollectionElements(Cascade.java:319)
at org.hibernate.engine.Cascade.cascadeCollection(Cascade.java:265)
at org.hibernate.engine.Cascade.cascadeAssociation(Cascade.java:242)
at org.hibernate.engine.Cascade.cascadeProperty(Cascade.java:192)
at org.hibernate.engine.Cascade.cascade(Cascade.java:153)
at org.hibernate.event.def.AbstractFlushingEventListener.cascadeOnFlush(AbstractFlushingEventListener.java:154)
at org.hibernate.event.def.AbstractFlushingEventListener.prepareEntityFlushes(AbstractFlushingEventListener.java:145)
at org.hibernate.event.def.AbstractFlushingEventListener.flushEverythingToExecutions(AbstractFlushingEventListener.java:88)
at org.hibernate.event.def.DefaultFlushEventListener.onFlush(DefaultFlushEventListener.java:49)
at org.hibernate.impl.SessionImpl.flush(SessionImpl.java:1027)
at org.jbpm.pvm.internal.tx.HibernateSessionResource.prepare(HibernateSessionResource.java:56)
at org.jbpm.pvm.internal.tx.StandardTransaction.commit(StandardTransaction.java:107)
at org.jbpm.pvm.internal.tx.StandardTransaction.complete(StandardTransaction.java:64)
at org.jbpm.pvm.internal.tx.StandardTransactionInterceptor.execute(StandardTransactionInterceptor.java:57)
at org.jbpm.pvm.internal.svc.EnvironmentInterceptor.executeInNewEnvironment(EnvironmentInterceptor.java:53)
at org.jbpm.pvm.internal.svc.EnvironmentInterceptor.execute(EnvironmentInterceptor.java:40)
at org.jbpm.pvm.internal.svc.RetryInterceptor.execute(RetryInterceptor.java:56)
at org.jbpm.pvm.internal.svc.SkipInterceptor.execute(SkipInterceptor.java:43)
at org.jbpm.pvm.internal.svc.ExecutionServiceImpl.startProcessInstanceByKey(ExecutionServiceImpl.java:75)
at test.Test.initStart(Test.java:52)
at test.Test.execute(Test.java:63)
at test.Test.main(Test.java:84)
小弟第一次用JBPM4.4进行开发,请各位帮帮忙看下。或者提供下如何实现线程级别的并行任务jbpmjava
现有五个任务:A、B、(C、D)、E任务,其中C和D是需要并发运行的任务2、实现:
1)、看到并发首先想到fork...join,流程图如下:<?xml version="1.0" encoding="UTF-8"?><process name="test_demo" key="test" xmlns="http://jbpm.org/4.4/jpdl">
<start g="92,7,48,48" name="start1">
<transition name="to A" to="A" g="-29,-17"/>
</start>
<task name="A" g="73,73,92,52">
<assignment-handler class="test.TestUtil">
</assignment-handler>
<transition name="to B" to="B" g="-29,-17"/>
</task>
<task name="B" g="73,150,92,52">
<assignment-handler class="test.TestUtil">
</assignment-handler>
<transition name="to fork" to="fork" g="-47,-17"/>
</task>
<fork name="fork" g="96,223,48,48">
<transition name="to C" to="C" g="-29,-17"/>
<transition name="to D" to="D" g="-29,-17"/>
</fork>
<task name="C" g="15,291,92,52">
<assignment-handler class="test.TestUtil">
<field name="isFork">
<string value="true"/>
</field>
</assignment-handler>
<transition name="to join" to="join" g="-47,-17"/>
</task>
<task name="D" g="143,289,92,52" >
<assignment-handler class="test.TestUtil">
<field name="isFork">
<string value="true"/>
</field>
</assignment-handler>
<transition name="to join" to="join" g="-47,-17"/>
</task>
<join name="join" g="104,360,48,48" multiplicity="2">
<transition name="to E" to="E" g="-29,-17"/>
</join>
<task name="E" g="80,434,92,52">
<assignment-handler class="test.TestUtil">
</assignment-handler>
<transition name="to end" to="wait" g="-29,-17"/>
</task>
<end name="end" g="105,603,48,48"/>
</process>2)、TestUtil,只有当是并行节点才加入线程中,代码如下:public class TestUtil implements AssignmentHandler {
private String isFork;
public String getIsFork() {
return isFork;
}
public void setIsFork(String isFork) {
this.isFork = isFork;
}
public void assign(Assignable asgnbl, OpenExecution oe) throws Exception {
oe.setVariable("result", "false");//设置到数据库成功
if ("true".equals(isFork){
new Thread(new ThreadTest(oe)).start();
}
}
}3)、因为fork...join标签里面的子流程是虽然是并行的,不过是串行触发而不是并发,所以我考虑把任务C和D放到线程里执行。然后根据存在数据库里的result进行线程等待处理,即线程C和D分别执行完把result设置为true,主线程才继续往下走public class ThreadTest implements Runnable { private OpenExecution oe;
public ThreadTest(OpenExecution oe) {
this.oe = oe;
}
public void run() {
//synchronized (this) {
try {
oe.setVariable("fuck", "xxxxxxxxx");//这里报错
System.out.println("result |||||" + oe.getVariable(oe.getActivity().getName()) + "####" + oe.getVariable("fuck") + "****" + oe.getActivity().getName() + "&&&&" + oe.getActivity().getType());
} catch (Exception ex) {
oe.setVariable("error", "cnm");
ex.printStackTrace();
}
// }
}
public OpenExecution getOe() {
return oe;
}
public void setOe(OpenExecution oe) {
this.oe = oe;
}
}4)、实现类Test: public ProcessEngine processEngine;
public RepositoryService repositoryService;
public ExecutionService executionService;
public TaskService taskService;
public ProcessInstance processInstance; public Test() {
processEngine = Configuration.getProcessEngine();
executionService = processEngine.getExecutionService();
taskService = processEngine.getTaskService();
} /**
* 流程发布
* @param key
* @return
*/
public String deploy(String path, String key) {
String deploymentId = processEngine.getRepositoryService().createDeployment().addResourceFromClasspath(path).deploy();
System.out.println("========流程发布:" + deploymentId);
return deploymentId;
} /**
* 根据预留的KEY启动流程
* @return
*/
public String initStart(String key) {
processInstance = executionService.startProcessInstanceByKey(key, key);
System.out.println("========流程开始:" + processInstance.getId());
return processInstance.getId();
} /**
* 流程执行
* @param key : 流程定义的KEY值
* @param activityName : 传入的流程的节点名称
*/
public void execute(String key) {
String processInstanceId = initStart(key);
while (true) {
List<Task> tasklist2 = taskService.findPersonalTasks(null);
for (Task task : tasklist2) {
if (task.getExecutionId().indexOf("test.test") == 0) {
taskService.completeTask(task.getId());
}
}
try {
Thread.sleep(500);
} catch (InterruptedException ex) {
}
}
}
public static void main(String[] args) {
Test j = new Test();
j.deploy("test/test.jpdl.xml", "test");
j.execute("test");
}
最后:报错信息:
Exception in thread "main" org.hibernate.HibernateException: instance not of expected entity type: org.jbpm.pvm.internal.type.variable.UnpersistableVariable is not a: org.jbpm.pvm.internal.type.Variable
at org.hibernate.persister.entity.AbstractEntityPersister.getSubclassEntityPersister(AbstractEntityPersister.java:3663)
at org.hibernate.impl.SessionImpl.getEntityPersister(SessionImpl.java:1374)
at org.hibernate.engine.ForeignKeys.isTransient(ForeignKeys.java:203)
at org.hibernate.event.def.AbstractSaveEventListener.getEntityState(AbstractSaveEventListener.java:535)
at org.hibernate.event.def.DefaultSaveOrUpdateEventListener.performSaveOrUpdate(DefaultSaveOrUpdateEventListener.java:103)
at org.hibernate.event.def.DefaultSaveOrUpdateEventListener.onSaveOrUpdate(DefaultSaveOrUpdateEventListener.java:93)
at org.hibernate.impl.SessionImpl.fireSaveOrUpdate(SessionImpl.java:534)
at org.hibernate.impl.SessionImpl.saveOrUpdate(SessionImpl.java:526)
at org.hibernate.engine.CascadingAction$5.cascade(CascadingAction.java:241)
at org.hibernate.engine.Cascade.cascadeToOne(Cascade.java:291)
at org.hibernate.engine.Cascade.cascadeAssociation(Cascade.java:239)
at org.hibernate.engine.Cascade.cascadeProperty(Cascade.java:192)
at org.hibernate.engine.Cascade.cascadeCollectionElements(Cascade.java:319)
at org.hibernate.engine.Cascade.cascadeCollection(Cascade.java:265)
at org.hibernate.engine.Cascade.cascadeAssociation(Cascade.java:242)
at org.hibernate.engine.Cascade.cascadeProperty(Cascade.java:192)
at org.hibernate.engine.Cascade.cascade(Cascade.java:153)
at org.hibernate.event.def.AbstractFlushingEventListener.cascadeOnFlush(AbstractFlushingEventListener.java:154)
at org.hibernate.event.def.AbstractFlushingEventListener.prepareEntityFlushes(AbstractFlushingEventListener.java:145)
at org.hibernate.event.def.AbstractFlushingEventListener.flushEverythingToExecutions(AbstractFlushingEventListener.java:88)
at org.hibernate.event.def.DefaultFlushEventListener.onFlush(DefaultFlushEventListener.java:49)
at org.hibernate.impl.SessionImpl.flush(SessionImpl.java:1027)
at org.jbpm.pvm.internal.tx.HibernateSessionResource.prepare(HibernateSessionResource.java:56)
at org.jbpm.pvm.internal.tx.StandardTransaction.commit(StandardTransaction.java:107)
at org.jbpm.pvm.internal.tx.StandardTransaction.complete(StandardTransaction.java:64)
at org.jbpm.pvm.internal.tx.StandardTransactionInterceptor.execute(StandardTransactionInterceptor.java:57)
at org.jbpm.pvm.internal.svc.EnvironmentInterceptor.executeInNewEnvironment(EnvironmentInterceptor.java:53)
at org.jbpm.pvm.internal.svc.EnvironmentInterceptor.execute(EnvironmentInterceptor.java:40)
at org.jbpm.pvm.internal.svc.RetryInterceptor.execute(RetryInterceptor.java:56)
at org.jbpm.pvm.internal.svc.SkipInterceptor.execute(SkipInterceptor.java:43)
at org.jbpm.pvm.internal.svc.ExecutionServiceImpl.startProcessInstanceByKey(ExecutionServiceImpl.java:75)
at test.Test.initStart(Test.java:52)
at test.Test.execute(Test.java:63)
at test.Test.main(Test.java:84)
小弟第一次用JBPM4.4进行开发,请各位帮帮忙看下。或者提供下如何实现线程级别的并行任务jbpmjava
解决方案 »
免费领取超大流量手机卡,每月29元包185G流量+100分钟通话, 中国电信官方发货