您好,登錄后才能下訂單哦!
本篇內容主要講解“Saga模式源碼方法教程”,感興趣的朋友不妨來看看。本文介紹的方法操作簡單快捷,實用性強。下面就讓小編來帶大家學習“Saga模式源碼方法教程”吧!
狀態機定義
以一個典型的電商購物流程為例,我們定義3個服務,訂單服務(OrderServer),賬戶服務(AccountService)和庫存服務(StorageService),這里我們把訂單服務當做聚合服務,也就是TM。
當外部下單時,訂單服務首先會創建一個訂單,然后調用賬戶服務扣減金額,最后調用庫存服務扣減庫存。這個流程入下圖:
seata的saga模式是基于狀態機來實現了,狀態機對狀態的控制需要一個JSON文件,這個JSON文件定義如下:
{ "Name": "buyGoodsOnline", "Comment": "buy a goods on line, add order, deduct account, deduct storage ", "StartState": "SaveOrder", "Version": "0.0.1", "States": { "SaveOrder": { "Type": "ServiceTask", "ServiceName": "orderSave", "ServiceMethod": "saveOrder", "CompensateState": "DeleteOrder", "Next": "ChoiceAccountState", "Input": [ "$.[businessKey]", "$.[order]" ], "Output": { "SaveOrderResult": "$.#root" }, "Status": { "#root == true": "SU", "#root == false": "FA", "$Exception{java.lang.Throwable}": "UN" } }, "ChoiceAccountState":{ "Type": "Choice", "Choices":[ { "Expression":"[SaveOrderResult] == true", "Next":"ReduceAccount" } ], "Default":"Fail" }, "ReduceAccount": { "Type": "ServiceTask", "ServiceName": "accountService", "ServiceMethod": "decrease", "CompensateState": "CompensateReduceAccount", "Next": "ChoiceStorageState", "Input": [ "$.[businessKey]", "$.[userId]", "$.[money]", { "throwException" : "$.[mockReduceAccountFail]" } ], "Output": { "ReduceAccountResult": "$.#root" }, "Status": { "#root == true": "SU", "#root == false": "FA", "$Exception{java.lang.Throwable}": "UN" }, "Catch": [ { "Exceptions": [ "java.lang.Throwable" ], "Next": "CompensationTrigger" } ] }, "ChoiceStorageState":{ "Type": "Choice", "Choices":[ { "Expression":"[ReduceAccountResult] == true", "Next":"ReduceStorage" } ], "Default":"Fail" }, "ReduceStorage": { "Type": "ServiceTask", "ServiceName": "storageService", "ServiceMethod": "decrease", "CompensateState": "CompensateReduceStorage", "Input": [ "$.[businessKey]", "$.[productId]", "$.[count]", { "throwException" : "$.[mockReduceStorageFail]" } ], "Output": { "ReduceStorageResult": "$.#root" }, "Status": { "#root == true": "SU", "#root == false": "FA", "$Exception{java.lang.Throwable}": "UN" }, "Catch": [ { "Exceptions": [ "java.lang.Throwable" ], "Next": "CompensationTrigger" } ], "Next": "Succeed" }, "DeleteOrder": { "Type": "ServiceTask", "ServiceName": "orderSave", "ServiceMethod": "deleteOrder", "Input": [ "$.[businessKey]", "$.[order]" ] }, "CompensateReduceAccount": { "Type": "ServiceTask", "ServiceName": "accountService", "ServiceMethod": "compensateDecrease", "Input": [ "$.[businessKey]", "$.[userId]", "$.[money]" ] }, "CompensateReduceStorage": { "Type": "ServiceTask", "ServiceName": "storageService", "ServiceMethod": "compensateDecrease", "Input": [ "$.[businessKey]", "$.[productId]", "$.[count]" ] }, "CompensationTrigger": { "Type": "CompensationTrigger", "Next": "Fail" }, "Succeed": { "Type":"Succeed" }, "Fail": { "Type":"Fail", "ErrorCode": "PURCHASE_FAILED", "Message": "purchase failed" } } }
狀態機是運行在TM中的,也就是我們上面定義的訂單服務。訂單服務創建訂單時需要開啟一個全局事務,這時就需要啟動狀態機,代碼如下:
StateMachineEngine stateMachineEngine = (StateMachineEngine) ApplicationContextUtils.getApplicationContext().getBean("stateMachineEngine"); Map<String, Object> startParams = new HashMap<>(3); String businessKey = String.valueOf(System.currentTimeMillis()); startParams.put("businessKey", businessKey); startParams.put("order", order); startParams.put("mockReduceAccountFail", "true"); startParams.put("userId", order.getUserId()); startParams.put("money", order.getPayAmount()); startParams.put("productId", order.getProductId()); startParams.put("count", order.getCount()); //sync test StateMachineInstance inst = stateMachineEngine.startWithBusinessKey("buyGoodsOnline", null, businessKey, startParams);
可以看到,上面代碼定義的buyGoodsOnline,正是JSON文件中name的屬性值。
狀態機初始化
那上面創建訂單代碼中的stateMachineEngine這個bean是在哪里定義的呢?訂單服務的demo中有一個類StateMachineConfiguration來進行定義,代碼如下:
public class StateMachineConfiguration { @Bean public ThreadPoolExecutorFactoryBean threadExecutor(){ ThreadPoolExecutorFactoryBean threadExecutor = new ThreadPoolExecutorFactoryBean(); threadExecutor.setThreadNamePrefix("SAGA_ASYNC_EXE_"); threadExecutor.setCorePoolSize(1); threadExecutor.setMaxPoolSize(20); return threadExecutor; } @Bean public DbStateMachineConfig dbStateMachineConfig(ThreadPoolExecutorFactoryBean threadExecutor, DataSource hikariDataSource) throws IOException { DbStateMachineConfig dbStateMachineConfig = new DbStateMachineConfig(); dbStateMachineConfig.setDataSource(hikariDataSource); dbStateMachineConfig.setThreadPoolExecutor((ThreadPoolExecutor) threadExecutor.getObject()); /** *這里配置了json文件的路徑,TM在初始化的時候,會把json文件解析成StateMachineImpl類,如果數據庫沒有保存這個狀態機,則存入數據庫seata_state_machine_def表, *如果數據庫有記錄,則取最新的一條記錄,并且注冊到StateMachineRepositoryImpl, *注冊的Map有2個,一個是stateMachineMapByNameAndTenant,key格式是(stateMachineName + "_" + tenantId), *一個是stateMachineMapById,key是stateMachine.getId() *具體代碼見StateMachineRepositoryImpl類registryStateMachine方法 *這個注冊的觸發方法在DefaultStateMachineConfig的初始化方法init(),這個類是DbStateMachineConfig的父類 */ dbStateMachineConfig.setResources(new PathMatchingResourcePatternResolver().getResources("classpath*:statelang/*.json"));//json文件 dbStateMachineConfig.setEnableAsync(true); dbStateMachineConfig.setApplicationId("order-server"); dbStateMachineConfig.setTxServiceGroup("my_test_tx_group"); return dbStateMachineConfig; } @Bean public ProcessCtrlStateMachineEngine stateMachineEngine(DbStateMachineConfig dbStateMachineConfig){ ProcessCtrlStateMachineEngine stateMachineEngine = new ProcessCtrlStateMachineEngine(); stateMachineEngine.setStateMachineConfig(dbStateMachineConfig); return stateMachineEngine; } @Bean public StateMachineEngineHolder stateMachineEngineHolder(ProcessCtrlStateMachineEngine stateMachineEngine){ StateMachineEngineHolder stateMachineEngineHolder = new StateMachineEngineHolder(); stateMachineEngineHolder.setStateMachineEngine(stateMachineEngine); return stateMachineEngineHolder; } }
可以看到,我們在DbStateMachineConfig中配置了狀態機的json文件,同時配置了applicationId和txServiceGroup。在DbStateMachineConfig初始化的時候,子類DefaultStateMachineConfig的init的方法會把json文件解析成狀態機,并注冊。
注冊的過程中往seata_state_machine_def這張表里插入了1條記錄,表里的content字段保存了我們的JOSON文件內容,其他字段值數據如下圖:
附:根據前面的JSON文件,我們debug跟蹤到的StateMachineImpl的內容如下:
id = null tenantId = null appName = "SEATA" name = "buyGoodsOnline" comment = "buy a goods on line, add order, deduct account, deduct storage " version = "0.0.1" startState = "SaveOrder" status = {StateMachine$Status@9135} "AC" recoverStrategy = null isPersist = true type = "STATE_LANG" content = null gmtCreate = null states = {LinkedHashMap@9137} size = 11 "SaveOrder" -> {ServiceTaskStateImpl@9153} "ChoiceAccountState" -> {ChoiceStateImpl@9155} "ReduceAccount" -> {ServiceTaskStateImpl@9157} "ChoiceStorageState" -> {ChoiceStateImpl@9159} "ReduceStorage" -> {ServiceTaskStateImpl@9161} "DeleteOrder" -> {ServiceTaskStateImpl@9163} "CompensateReduceAccount" -> {ServiceTaskStateImpl@9165} "CompensateReduceStorage" -> {ServiceTaskStateImpl@9167} "CompensationTrigger" -> {CompensationTriggerStateImpl@9169} "Succeed" -> {SucceedEndStateImpl@9171} "Fail" -> {FailEndStateImpl@9173}
啟動狀態機
在第一節創建訂單的代碼中,startWithBusinessKey方法進行了整個事務的啟動,這個方法還有一個異步模式startWithBusinessKeyAsync,這里我們只分析同步模式,源代碼如下:
public StateMachineInstance startWithBusinessKey(String stateMachineName, String tenantId, String businessKey, Map<String, Object> startParams) throws EngineExecutionException { return startInternal(stateMachineName, tenantId, businessKey, startParams, false, null); } private StateMachineInstance startInternal(String stateMachineName, String tenantId, String businessKey, Map<String, Object> startParams, boolean async, AsyncCallback callback) throws EngineExecutionException { //省略部分源代碼 //創建一個狀態機實例 //默認值tenantId="000001" StateMachineInstance instance = createMachineInstance(stateMachineName, tenantId, businessKey, startParams); /** * ProcessType.STATE_LANG這個枚舉只有一個元素 * OPERATION_NAME_START = "start" * callback是null * getStateMachineConfig()返回DbStateMachineConfig */ ProcessContextBuilder contextBuilder = ProcessContextBuilder.create().withProcessType(ProcessType.STATE_LANG) .withOperationName(DomainConstants.OPERATION_NAME_START).withAsyncCallback(callback).withInstruction( new StateInstruction(stateMachineName, tenantId)).withStateMachineInstance(instance) .withStateMachineConfig(getStateMachineConfig()).withStateMachineEngine(this); Map<String, Object> contextVariables; if (startParams != null) { contextVariables = new ConcurrentHashMap<>(startParams.size()); nullSafeCopy(startParams, contextVariables); } else { contextVariables = new ConcurrentHashMap<>(); } instance.setContext(contextVariables);//把啟動參數賦值給狀態機實例的context //給ProcessContextImpl的variables加參數 contextBuilder.withStateMachineContextVariables(contextVariables); contextBuilder.withIsAsyncExecution(async); //上面定義的建造者創建一個ProcessContextImpl ProcessContext processContext = contextBuilder.build(); //這個條件是true if (instance.getStateMachine().isPersist() && stateMachineConfig.getStateLogStore() != null) { //記錄狀態機開始狀態 stateMachineConfig.getStateLogStore().recordStateMachineStarted(instance, processContext); } if (StringUtils.isEmpty(instance.getId())) { instance.setId( stateMachineConfig.getSeqGenerator().generate(DomainConstants.SEQ_ENTITY_STATE_MACHINE_INST)); } if (async) { stateMachineConfig.getAsyncProcessCtrlEventPublisher().publish(processContext); } else { //發送消息到EventBus,這里的消費者是ProcessCtrlEventConsumer,在DefaultStateMachineConfig初始化時設置 stateMachineConfig.getProcessCtrlEventPublisher().publish(processContext); } return instance; }
上面的代碼中我們可以看出,啟動狀態記得時候主要做了2件事情,一個是記錄狀態機開始的狀態,一個是發送消息到EventBus,下面我們詳細看一下這2個過程。
開啟全局事務
上面的代碼分析中,有一個記錄狀態機開始狀態的代碼,如下:
stateMachineConfig.getStateLogStore().recordStateMachineStarted(instance, processContext);
這里調用了類DbAndReportTcStateLogStore的recordStateMachineStarted方法,我們來看一下,代碼如下:
public void recordStateMachineStarted(StateMachineInstance machineInstance, ProcessContext context) { if (machineInstance != null) { //if parentId is not null, machineInstance is a SubStateMachine, do not start a new global transaction, //use parent transaction instead. String parentId = machineInstance.getParentId(); if (StringUtils.hasLength(parentId)) { if (StringUtils.isEmpty(machineInstance.getId())) { machineInstance.setId(parentId); } } else { //走這個分支,因為沒有配置子狀態機 /** * 這里的beginTransaction就是開啟全局事務, * 這里是調用TC開啟全局事務 */ beginTransaction(machineInstance, context); } if (StringUtils.isEmpty(machineInstance.getId()) && seqGenerator != null) { machineInstance.setId(seqGenerator.generate(DomainConstants.SEQ_ENTITY_STATE_MACHINE_INST)); } // save to db //dbType = "MySQL" machineInstance.setSerializedStartParams(paramsSerializer.serialize(machineInstance.getStartParams())); executeUpdate(stateLogStoreSqls.getRecordStateMachineStartedSql(dbType), STATE_MACHINE_INSTANCE_TO_STATEMENT_FOR_INSERT, machineInstance); } }
上面executeUpdate方法在子類AbstractStore,debug一下executeUpdate這個方法可以看到,這里執行的sql如下:
INSERT INTO seata_state_machine_inst (id, machine_id, tenant_id, parent_id, gmt_started, business_key, start_params, is_running, status, gmt_updated) VALUES ('192.168.59.146:8091:65853497147990016', '06a098cab53241ca7ed09433342e9f07', '000001', null, '2020-10-31 17:18:24.773', '1604135904773', '{"@type":"java.util.HashMap","money":50.,"productId":1L,"_business_key_":"1604135904773","businessKey":"1604135904773", "count":1,"mockReduceAccountFail":"true","userId":1L,"order":{"@type":"io.seata.sample.entity.Order","count":1,"payAmount":50, "productId":1,"userId":1}}', 1, 'RU', '2020-10-31 17:18:24.773')
可以看到,這個全局事務記錄在了表seata_state_machine_inst,記錄的是我們啟動狀態機的參數,status記錄的狀態是"RU"也就是RUNNING。
分支事務處理
上一節我們提到,啟動狀態機后,向EventBus發了一條消息,這個消息的消費者是ProcessCtrlEventConsumer,我們看一下這個類的代碼:
public class ProcessCtrlEventConsumer implements EventConsumer<ProcessContext> { private ProcessController processController; @Override public void process(ProcessContext event) throws FrameworkException { //這里的processController是ProcessControllerImpl processController.process(event); } @Override public boolean accept(Class<ProcessContext> clazz) { return ProcessContext.class.isAssignableFrom(clazz); } public void setProcessController(ProcessController processController) { this.processController = processController; } }
ProcessControllerImpl類的process方法有2個處理邏輯,process和route,代碼如下:
public void process(ProcessContext context) throws FrameworkException { try { //這里的businessProcessor是CustomizeBusinessProcessor businessProcessor.process(context); businessProcessor.route(context); } catch (FrameworkException fex) { throw fex; } catch (Exception ex) { LOGGER.error("Unknown exception occurred, context = {}", context, ex); throw new FrameworkException(ex, "Unknown exception occurred", FrameworkErrorCode.UnknownAppError); } }
這里的處理邏輯有些復雜,先上一張UML類圖,跟著這張圖,可以捋清楚代碼的調用邏輯:
我們先來看一下CustomizeBusinessProcessor中的process方法:
public void process(ProcessContext context) throws FrameworkException { /** *processType = {ProcessType@10310} "STATE_LANG" *code = "STATE_LANG" *message = "SEATA State Language" *name = "STATE_LANG" *ordinal = 0 */ ProcessType processType = matchProcessType(context); if (processType == null) { if (LOGGER.isWarnEnabled()) { LOGGER.warn("Process type not found, context= {}", context); } throw new FrameworkException(FrameworkErrorCode.ProcessTypeNotFound); } ProcessHandler processor = processHandlers.get(processType.getCode()); if (processor == null) { LOGGER.error("Cannot find process handler by type {}, context= {}", processType.getCode(), context); throw new FrameworkException(FrameworkErrorCode.ProcessHandlerNotFound); } //這里的是StateMachineProcessHandler processor.process(context); }
這里的代碼不好理解,我們分四步來研究。
第一步,我們看一下StateMachineProcessHandler類中process方法,這個方法代理了ServiceTaskStateHandler的process方法,代碼如下:
public void process(ProcessContext context) throws FrameworkException { /** * instruction = {StateInstruction@11057} * stateName = null * stateMachineName = "buyGoodsOnline" * tenantId = "000001" * end = false * temporaryState = null */ StateInstruction instruction = context.getInstruction(StateInstruction.class); //這里的state實現類是ServiceTaskStateImpl State state = instruction.getState(context); String stateType = state.getType(); //這里stateHandler實現類是ServiceTaskStateHandler StateHandler stateHandler = stateHandlers.get(stateType); List<StateHandlerInterceptor> interceptors = null; if (stateHandler instanceof InterceptableStateHandler) { //list上有1個元素ServiceTaskHandlerInterceptor interceptors = ((InterceptableStateHandler)stateHandler).getInterceptors(); } List<StateHandlerInterceptor> executedInterceptors = null; Exception exception = null; try { if (interceptors != null && interceptors.size() > 0) { executedInterceptors = new ArrayList<>(interceptors.size()); for (StateHandlerInterceptor interceptor : interceptors) { executedInterceptors.add(interceptor); interceptor.preProcess(context); } } stateHandler.process(context); } catch (Exception e) { exception = e; throw e; } finally { if (executedInterceptors != null && executedInterceptors.size() > 0) { for (int i = executedInterceptors.size() - 1; i >= 0; i--) { StateHandlerInterceptor interceptor = executedInterceptors.get(i); interceptor.postProcess(context, exception); } } } }
從這個方法我們看到,代理對stateHandler.process加入了前置和后置增強,增強類是ServiceTaskHandlerInterceptor,前置后置增強分別調用了interceptor的preProcess和postProcess。
第二步,我們來看一下增強邏輯。ServiceTaskHandlerInterceptor的preProcess和postProcess方法,代碼如下:
public class ServiceTaskHandlerInterceptor implements StateHandlerInterceptor { //省略部分代碼 @Override public void preProcess(ProcessContext context) throws EngineExecutionException { StateInstruction instruction = context.getInstruction(StateInstruction.class); StateMachineInstance stateMachineInstance = (StateMachineInstance)context.getVariable( DomainConstants.VAR_NAME_STATEMACHINE_INST); StateMachineConfig stateMachineConfig = (StateMachineConfig)context.getVariable( DomainConstants.VAR_NAME_STATEMACHINE_CONFIG); //如果超時,修改狀態機狀態為FA if (EngineUtils.isTimeout(stateMachineInstance.getGmtUpdated(), stateMachineConfig.getTransOperationTimeout())) { String message = "Saga Transaction [stateMachineInstanceId:" + stateMachineInstance.getId() + "] has timed out, stop execution now."; EngineUtils.failStateMachine(context, exception); throw exception; } StateInstanceImpl stateInstance = new StateInstanceImpl(); Map<String, Object> contextVariables = (Map<String, Object>)context.getVariable( DomainConstants.VAR_NAME_STATEMACHINE_CONTEXT); ServiceTaskStateImpl state = (ServiceTaskStateImpl)instruction.getState(context); List<Object> serviceInputParams = null; Object isForCompensation = state.isForCompensation(); if (isForCompensation != null && (Boolean)isForCompensation) { CompensationHolder compensationHolder = CompensationHolder.getCurrent(context, true); StateInstance stateToBeCompensated = compensationHolder.getStatesNeedCompensation().get(state.getName()); if (stateToBeCompensated != null) { stateToBeCompensated.setCompensationState(stateInstance); stateInstance.setStateIdCompensatedFor(stateToBeCompensated.getId()); } else { LOGGER.error("Compensation State[{}] has no state to compensate, maybe this is a bug.", state.getName()); } //加入補償集合 CompensationHolder.getCurrent(context, true).addForCompensationState(stateInstance.getName(), stateInstance); } //省略部分代碼 stateInstance.setInputParams(serviceInputParams); if (stateMachineInstance.getStateMachine().isPersist() && state.isPersist() && stateMachineConfig.getStateLogStore() != null) { try { //記錄一個分支事務的狀態RU到數據庫 /** *INSERT INTO seata_state_inst (id, machine_inst_id, name, type, gmt_started, service_name, service_method, service_type, is_for_update, input_params, status, business_key, state_id_compensated_for, state_id_retried_for) *VALUES ('4fe5f602452c84ba5e88fd2ee9c13b35', '192.168.59.146:8091:65853497147990016', 'SaveOrder', 'ServiceTask', '2020-10-31 17:18:40.84', 'orderSave', *'saveOrder', null, 1, '["1604135904773",{"@type":"io.seata.sample.entity.Order","count":1,"payAmount":50,"productId":1,"userId":1}]', 'RU', null, null, null) */ stateMachineConfig.getStateLogStore().recordStateStarted(stateInstance, context); } } //省略部分代碼 stateMachineInstance.putStateInstance(stateInstance.getId(), stateInstance);//放入StateMachineInstanceImpl的stateMap用于重試或交易補償 ((HierarchicalProcessContext)context).setVariableLocally(DomainConstants.VAR_NAME_STATE_INST, stateInstance);//記錄狀態后面傳給TaskStateRouter判斷全局事務結束 } @Override public void postProcess(ProcessContext context, Exception exp) throws EngineExecutionException { StateInstruction instruction = context.getInstruction(StateInstruction.class); ServiceTaskStateImpl state = (ServiceTaskStateImpl)instruction.getState(context); StateMachineInstance stateMachineInstance = (StateMachineInstance)context.getVariable( DomainConstants.VAR_NAME_STATEMACHINE_INST); StateInstance stateInstance = (StateInstance)context.getVariable(DomainConstants.VAR_NAME_STATE_INST); if (stateInstance == null || !stateMachineInstance.isRunning()) { LOGGER.warn("StateMachineInstance[id:" + stateMachineInstance.getId() + "] is end. stop running"); return; } StateMachineConfig stateMachineConfig = (StateMachineConfig)context.getVariable( DomainConstants.VAR_NAME_STATEMACHINE_CONFIG); if (exp == null) { exp = (Exception)context.getVariable(DomainConstants.VAR_NAME_CURRENT_EXCEPTION); } stateInstance.setException(exp); //設置事務狀態 decideExecutionStatus(context, stateInstance, state, exp); //省略部分代碼 Map<String, Object> contextVariables = (Map<String, Object>)context.getVariable( DomainConstants.VAR_NAME_STATEMACHINE_CONTEXT); //省略部分代碼 context.removeVariable(DomainConstants.VAR_NAME_OUTPUT_PARAMS); context.removeVariable(DomainConstants.VAR_NAME_INPUT_PARAMS); stateInstance.setGmtEnd(new Date()); if (stateMachineInstance.getStateMachine().isPersist() && state.isPersist() && stateMachineConfig.getStateLogStore() != null) { //更新分支事務的狀態為成功 /** * UPDATE seata_state_inst SET gmt_end = '2020-10-31 17:18:49.919', excep = null, status = 'SU', * output_params = 'true' WHERE id = '4fe5f602452c84ba5e88fd2ee9c13b35' AND * machine_inst_id = '192.168.59.146:8091:65853497147990016' */ stateMachineConfig.getStateLogStore().recordStateFinished(stateInstance, context); } //省略部分代碼 } }
從這個代碼我們能看到,分支事務執行前,封裝了一個StateInstanceImpl賦值給了ProcessContext,分支事務執行后,對這個StateInstanceImpl進行了修改,這個StateInstanceImpl有3個作用:
傳入StateMachineInstanceImpl的stateMap用于重試或交易補償
記錄了分支事務的執行情況,同時支持持久化到seata_state_inst表
傳入TaskStateRouter用作判斷全局事務結束
第三步,我們看一下被代理的方法stateHandler.process(context),正常執行邏輯中stateHandler的實現類是ServiceTaskStateHandler,代碼如下:
public void process(ProcessContext context) throws EngineExecutionException { StateInstruction instruction = context.getInstruction(StateInstruction.class); ServiceTaskStateImpl state = (ServiceTaskStateImpl) instruction.getState(context); StateInstance stateInstance = (StateInstance) context.getVariable(DomainConstants.VAR_NAME_STATE_INST); Object result; try { /** * 這里的input是我們在JSON中定義的,比如orderSave這個ServiceTask,input如下: * 0 = "1608714480316" * 1 = {Order@11271} "Order(id=null, userId=1, productId=1, count=1, payAmount=50, status=null)" * JSON中定義如下: * "Input": [ * "$.[businessKey]", * "$.[order]" * ] */ List<Object> input = (List<Object>) context.getVariable(DomainConstants.VAR_NAME_INPUT_PARAMS); //Set the current task execution status to RU (Running) stateInstance.setStatus(ExecutionStatus.RU);//設置狀態 if (state instanceof CompensateSubStateMachineState) { //省略子狀態機的研究 } else { StateMachineConfig stateMachineConfig = (StateMachineConfig) context.getVariable( DomainConstants.VAR_NAME_STATEMACHINE_CONFIG); //這里的state.getServiceType是springBean ServiceInvoker serviceInvoker = stateMachineConfig.getServiceInvokerManager().getServiceInvoker( state.getServiceType()); if (serviceInvoker == null) { throw new EngineExecutionException("No such ServiceInvoker[" + state.getServiceType() + "]", FrameworkErrorCode.ObjectNotExists); } if (serviceInvoker instanceof ApplicationContextAware) { ((ApplicationContextAware) serviceInvoker).setApplicationContext( stateMachineConfig.getApplicationContext()); } //這里觸發了我們在JSON中定義ServiceTask中方法,比如orderSave中的saveOrder方法 result = serviceInvoker.invoke(state, input.toArray()); } if (LOGGER.isDebugEnabled()) { LOGGER.debug("<<<<<<<<<<<<<<<<<<<<<< State[{}], ServiceName[{}], Method[{}] Execute finish. result: {}", state.getName(), serviceName, methodName, result); } //省略部分代碼 } //省略異常處理代碼 }
可以看到,process這個方法是一個核心的業務處理,它用發射觸發了JSON中定義ServiceTask的方法,并且根據狀態觸發了Next對象,即流程中的下一個ServiceTask。
第四步,我們再看一下CustomizeBusinessProcessor的route方法,代碼如下:
public void route(ProcessContext context) throws FrameworkException { //code = "STATE_LANG" //message = "SEATA State Language" //name = "STATE_LANG" //ordinal = 0 ProcessType processType = matchProcessType(context); RouterHandler router = routerHandlers.get(processType.getCode()); //DefaultRouterHandler的route方法 router.route(context); }
我們看一下DefaultRouterHandler的route方法,代碼如下:
public void route(ProcessContext context) throws FrameworkException { try { ProcessType processType = matchProcessType(context); //這里的processRouter是StateMachineProcessRouter ProcessRouter processRouter = processRouters.get(processType.getCode()); Instruction instruction = processRouter.route(context); if (instruction == null) { LOGGER.info("route instruction is null, process end"); } else { context.setInstruction(instruction); eventPublisher.publish(context); } } catch (FrameworkException e) { throw e; } catch (Exception ex) { throw new FrameworkException(ex, ex.getMessage(), FrameworkErrorCode.UnknownAppError); } }
看一下StateMachineProcessRouter的route方法,這里也是用了代理模式,代碼如下:
public Instruction route(ProcessContext context) throws FrameworkException { StateInstruction stateInstruction = context.getInstruction(StateInstruction.class); State state; if (stateInstruction.getTemporaryState() != null) { state = stateInstruction.getTemporaryState(); stateInstruction.setTemporaryState(null); } else { //走這個分支 StateMachineConfig stateMachineConfig = (StateMachineConfig)context.getVariable( DomainConstants.VAR_NAME_STATEMACHINE_CONFIG); StateMachine stateMachine = stateMachineConfig.getStateMachineRepository().getStateMachine( stateInstruction.getStateMachineName(), stateInstruction.getTenantId()); state = stateMachine.getStates().get(stateInstruction.getStateName()); } String stateType = state.getType(); StateRouter router = stateRouters.get(stateType); Instruction instruction = null; List<StateRouterInterceptor> interceptors = null; if (router instanceof InterceptableStateRouter) { //這里只有EndStateRouter interceptors = ((InterceptableStateRouter)router).getInterceptors();//EndStateRouterInterceptor } List<StateRouterInterceptor> executedInterceptors = null; Exception exception = null; try { //前置增量實現方法是空,這里省略代碼 instruction = router.route(context, state); } catch (Exception e) { exception = e; throw e; } finally { if (executedInterceptors != null && executedInterceptors.size() > 0) { for (int i = executedInterceptors.size() - 1; i >= 0; i--) { StateRouterInterceptor interceptor = executedInterceptors.get(i); interceptor.postRoute(context, state, instruction, exception);//結束狀態機 } } //if 'Succeed' or 'Fail' State did not configured, we must end the state machine if (instruction == null && !stateInstruction.isEnd()) { EngineUtils.endStateMachine(context); } } return instruction; }
這里的代理只實現了一個后置增強,做的事情就是結束狀態機。
下面我們來看一下StateRouter,UML類圖如下:
從UML類圖我們看到,除了EndStateRouter,只有一個TaskStateRouter了。而EndStateRouter并沒有做什么事情,因為關閉狀態機的邏輯已經由代理做了。這里我們看一下TaskStateRouter,代碼如下:
public Instruction route(ProcessContext context, State state) throws EngineExecutionException { StateInstruction stateInstruction = context.getInstruction(StateInstruction.class); if (stateInstruction.isEnd()) { //如果已經結束,直接返回 //省略代碼 } //The current CompensationTriggerState can mark the compensation process is started and perform compensation // route processing. State compensationTriggerState = (State)context.getVariable( DomainConstants.VAR_NAME_CURRENT_COMPEN_TRIGGER_STATE); if (compensationTriggerState != null) { //加入補償集合進行補償并返回 return compensateRoute(context, compensationTriggerState); } //There is an exception route, indicating that an exception is thrown, and the exception route is prioritized. String next = (String)context.getVariable(DomainConstants.VAR_NAME_CURRENT_EXCEPTION_ROUTE); if (StringUtils.hasLength(next)) { context.removeVariable(DomainConstants.VAR_NAME_CURRENT_EXCEPTION_ROUTE); } else { next = state.getNext(); } //If next is empty, the state selected by the Choice state was taken. if (!StringUtils.hasLength(next) && context.hasVariable(DomainConstants.VAR_NAME_CURRENT_CHOICE)) { next = (String)context.getVariable(DomainConstants.VAR_NAME_CURRENT_CHOICE); context.removeVariable(DomainConstants.VAR_NAME_CURRENT_CHOICE); } //從當前context中取不出下一個節點了,直接返回 if (!StringUtils.hasLength(next)) { return null; } StateMachine stateMachine = state.getStateMachine(); State nextState = stateMachine.getState(next); if (nextState == null) { throw new EngineExecutionException("Next state[" + next + "] is not exits", FrameworkErrorCode.ObjectNotExists); } //獲取到下一個要流轉的狀態并且賦值給stateInstruction stateInstruction.setStateName(next); return stateInstruction; }
可以看到,route的作用是幫狀態機確定下一個流程節點,然后放入到當前的context中的stateInstruction。
到這里,我們就分析完成了狀態機的原理,ProcessControllerImpl類中。
需要注意的是,這里獲取到下一個節點后,并沒有直接處理,而是使用觀察者模式,先發送到EventBus,等待觀察者來處理,循環往復,直到EndStateRouter結束狀態機。
這里觀察者模式的Event是ProcessContext,里面包含了Instruction,而Instruction里面包含了State,這個State里面就決定了下一個處理的節點直到結束。UML類圖如下:
總結
seata中間件中的saga模式使用比較廣泛,但是代碼還是比較復雜的。我從下面幾個方面進行了梳理:
我們定義的json文件加載到了類StateMachineImpl中。
啟動狀態機,我們也就啟動了全局事務,這個普通模式啟動全局事務是一樣的,都會向TC發送消息。
處理狀態機狀態和控制狀態流轉的入口類在ProcessControllerImpl,從process方法可以跟代碼。
ProcessControllerImpl調用CustomizeBusinessProcessor的process處理當前狀態,然后調用route方法獲取到下一個節點并發送到EventBus。
saga模式額外引入了3張表,我們也可以根據跟全局事務和分支事務相關的2張表來跟蹤代碼,我之前給出的demo,如果事務成功,這2張表的寫sql按照狀態機執行順序給出一個成功sql,代碼如下:
INSERT INTO seata_state_machine_inst (id, machine_id, tenant_id, parent_id, gmt_started, business_key, start_params, is_running, status, gmt_updated) VALUES ('192.168.59.146:8091:65853497147990016', '06a098cab53241ca7ed09433342e9f07', '000001', null, '2020-10-31 17:18:24.773', '1604135904773', '{"@type":"java.util.HashMap","money":50.,"productId":1L,"_business_key_":"1604135904773","businessKey":"1604135904773",\"count\":1,\"mockreduceaccountfail\":\"true\","userId":1L,"order":{"@type":"io.seata.sample.entity.Order","count":1,"payAmount":50,"productId":1,"userId":1}}', 1, 'RU', '2020-10-31 17:18:24.773') INSERT INTO seata_state_inst (id, machine_inst_id, name, type, gmt_started, service_name, service_method, service_type, is_for_update, input_params, status, business_key, state_id_compensated_for, state_id_retried_for) VALUES ('4fe5f602452c84ba5e88fd2ee9c13b35', '192.168.59.146:8091:65853497147990016', 'SaveOrder', 'ServiceTask', '2020-10-31 17:18:40.84', 'orderSave', 'saveOrder', null, 1, '["1604135904773",{"@type":"io.seata.sample.entity.Order","count":1,"payAmount":50,"productId":1,"userId":1}]', 'RU', null, null, null) UPDATE seata_state_inst SET gmt_end = '2020-10-31 17:18:49.919', excep = null, status = 'SU', output_params = 'true' WHERE id = '4fe5f602452c84ba5e88fd2ee9c13b35' AND machine_inst_id = '192.168.59.146:8091:65853497147990016' INSERT INTO seata_state_inst (id, machine_inst_id, name, type, gmt_started, service_name, service_method, service_type, is_for_update, input_params, status, business_key, state_id_compensated_for, state_id_retried_for) VALUES ('8371235cb2c66c8626e148f66123d3b4', '192.168.59.146:8091:65853497147990016', 'ReduceAccount', 'ServiceTask', '2020-10-31 17:19:00.441', 'accountService', 'decrease', null, 1, '["1604135904773",1L,50.,{"@type":"java.util.LinkedHashMap","throwException":"true"}]', 'RU', null, null, null) UPDATE seata_state_inst SET gmt_end = '2020-10-31 17:19:09.593', excep = null, status = 'SU', output_params = 'true' WHERE id = '8371235cb2c66c8626e148f66123d3b4' AND machine_inst_id = '192.168.59.146:8091:65853497147990016' INSERT INTO seata_state_inst (id, machine_inst_id, name, type, gmt_started, service_name, service_method, service_type, is_for_update, input_params, status, business_key, state_id_compensated_for, state_id_retried_for) VALUES ('e70a49f1eac72f929085f4e82c2b4de2', '192.168.59.146:8091:65853497147990016', 'ReduceStorage', 'ServiceTask', '2020-10-31 17:19:18.494', 'storageService', 'decrease', null, 1, '["1604135904773",1L,1,{"@type":"java.util.LinkedHashMap"}]', 'RU', null, null, null) UPDATE seata_state_inst SET gmt_end = '2020-10-31 17:19:26.613', excep = null, status = 'SU', output_params = 'true' WHERE id = 'e70a49f1eac72f929085f4e82c2b4de2' AND machine_inst_id = '192.168.59.146:8091:65853497147990016' UPDATE seata_state_machine_inst SET gmt_end = '2020-10-31 17:19:33.581', excep = null, end_params = '{"@type":"java.util.HashMap","productId":1L,"count":1,"ReduceAccountResult":true,"mockReduceAccountFail":"true","userId":1L,"money":50.,"SaveOrderResult":true,"_business_key_":"1604135904773","businessKey":"1604135904773","ReduceStorageResult":true,"order":{"@type":"io.seata.sample.entity.Order","count":1,"id":60,"payAmount":50,"productId":1,"userId":1}}',status = 'SU', compensation_status = null, is_running = 0, gmt_updated = '2020-10-31 17:19:33.582' WHERE id = '192.168.59.146:8091:65853497147990016' and gmt_updated = '2020-10-31 17:18:24.773'
到此,相信大家對“Saga模式源碼方法教程”有了更深的了解,不妨來實際操作一番吧!這里是億速云網站,更多相關內容可以進入相關頻道進行查詢,關注我們,繼續學習!
免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。