I am using service broker as my messaging system to schedule and run jobs. Eash job is composed of multiple tasks or steps called engines.
My service broker objects are:
- MessageTypes: SubmitJob, JobResponse, SubmitTask, TaskResponse
- Contracts: JobContract, TaskContract
- Queues: ClientQueue, JobQueue, EngineQueue, ExternalActivatorQueue
- Services: ClientService, JobService, EngineService, ExternalActivatorService
- Event Notification: EventNotificationEngineQueue
I have internal activation (stored proc) on the jobqueue. For SubmitJob MessageTypes the stored proc gets the first task for that job, starts a dialog with the EngineService and sends a message to that Queue (StartTask) For TaskResponses MessageType, I check to see if there are any more task for this job, if there are then they get submitted to the EngineQueue, if not then the task for this job are complete (send message and clean up.)
That all seems to be working great. However, I want to have a external app (engine) that will process the EngineQueue messages. So I am using Microsoft's external activation mechanism (ssbeas.exe.) It took a long time but I finally got it to work. A message goes into the EngineQueue, the EventNotificationEngineQueue fires up my application and drains the queue. So far so good. However, my app seems to be running multiple times. My test application is configured to send an email when it completes. Even though I only send one job with one task I get multiple emails (indicating the program ran multiple times.)
Here is the code for my app (vb.net) (broker is a object that encapsulates the service broker services (Send, Receive, etc.) :
While True
oBroker.tran = oBroker.cnn.BeginTransaction
oBroker.Receive("EMGQueue", msgType, msg, serviceInstance, dialogHandle)
If dialogHandle = System.Guid.Empty Then
'Console.WriteLine("An Error Occurred. Program Terminated.")
oBroker.tran.Commit()
Exit While
End If
ConsoleWriteLine("Received: " & msgType)
If (msg Is Nothing) Then
ConsoleWriteLine("commiting and exiting")
oBroker.tran.Commit()
Exit While
Else
Select Case (msgType)
Case "SubmitTask"
ProcessMsg(oBroker.cnn, oBroker.tran, msgType, msg, iTaskID, iTaskKey)
oBroker.Send(dialogHandle, "<TaskStatus>1</TaskStatus>'")
Case "http://schemas.microsoft.com/SQL/ServiceBroker/EndDialog"
oBroker.EndDialog(dialogHandle)
Case "http://schemas.microsoft.com/SQL/ServiceBroker/Error"""
oBroker.EndDialog(dialogHandle)
End Select
End If
ConsoleWriteLine("commiting...")
oBroker.tran.Commit()
End While
I dont understand why the app is running multiple times but beyond that I dont understand why the subsequent versions are still able to see the message in the queue. After all, the first incarnation should have locked the message in the queue. It does lock the queue because I was able to test using query manager to try and receive a message while my app was running and it was blocked.
I have tried playing with the concurrency values in the EAService.config. When I set it to min="0" and max="1" I did reduced the number times the app seem to be running down to two Previously, using min="0" and max="10", it was running it seemed like 18 copies.
I hope that made sense and sorry about the length. Anyone have any ideas what is happening here? Have I made a mistake in my .net app coding?
Thanks Martin
Edit: adding log that is created after the Engine has run:
2010-02-08 09:31:39 - Main
2010-02-08 09:31:39 - Received: SubmitTask 2010-02-08 09:31:39 - ProcessMsg 2010-02-08 09:31:39 -<Task><TaskID> 5</TaskID><TaskKey>2</TaskKey></Task>
2010-02-08 09:31:39 - DoWork
2010-02-08 09:31:39 - Sending Email
2010-02-08 09:31:40 - commiting...
2010-02-08 09:31:40 - Sleeping
2010-02-08 09:32:10 - Sleeping Completed.
2010-02-08 09:32:10 - Main Complete
2010-02-08 09:32:10 - External activated application succeeds and terminates now.
2010-02-08 09:32:10 - Main
2010-02-08 09:32:10 - Received: SubmitTask
2010-02-08 09:32:10 - ProcessMsg
2010-02-08 09:32:10 - <Task><TaskID> 5</TaskID><TaskKey> 2</TaskKey></Task>
2010-02-08 09:32:10 - DoWork
2010-02-08 09:32:10 - Sending Email
2010-0开发者_高级运维2-08 09:32:10 - commiting...
2010-02-08 09:32:10 - Sleeping
2010-02-08 09:32:40 - Sleeping Completed.
2010-02-08 09:32:40 - Main Complete
2010-02-08 09:32:40 - External activated application succeeds and terminates now.
You can see it goes through the whole app twice (main, received, dowork, sendemail, complete.)
Edit 2: here is the latest incarnation of the stored procedure (debuggin statements and all) that gets activated when a job is submitted to the queue:
ALTER PROCEDURE [dbo].[pr_ProcessJob] AS BEGIN
DECLARE @message_type_name sysname
DECLARE @dialog uniqueidentifier
DECLARE @message_sequence_number bigint
DECLARE @error_message_sequence_number bigint
DECLARE @message_body xml
DECLARE @cgid uniqueidentifier
DECLARE @JobID int
DECLARE @Params varchar(MAX)
DECLARE @ErrorNumber bigint
DECLARE @ErrorText nvarchar(MAX)
DECLARE @TaskID int
DECLARE @TaskService varchar(100)
DECLARE @TaskKey int
DECLARE @chEngine uniqueidentifier
DECLARE @Step int
DECLARE @NextStep int
DECLARE @jobch uniqueidentifier
DECLARE @EngineMsg XML
DECLARE @TimeStarted datetime
DECLARE @TaskStatus int
-- This procedure will just sit in a loop processing Task messages in the queue
-- until the queue is empty
SET NOCOUNT ON
SET @error_message_sequence_number = -100
PRINT 'pr_ProcessJob: Start'
WHILE (1=1) BEGIN
BEGIN TRY
PRINT 'pr_ProcessJob: BEGIN TRANSACTION'
BEGIN TRANSACTION
-- first lets get the conversation group id for the next message.
WAITFOR (
GET CONVERSATION GROUP @cgid FROM [JobQueue]
), TIMEOUT 1000
IF (@@ROWCOUNT = 0) BEGIN
PRINT 'pr_ProcessJob: ROLLBACK TRANSACTION (GET CONVERSATION)'
ROLLBACK TRANSACTION
BREAK
END
PRINT @CGID
-- Inner Loop (Message Processing)
WHILE (1=1) BEGIN
-- Receive the next available message
PRINT 'Receiving Message.'
WAITFOR (
RECEIVE top(1) -- just handle one message at a time
@message_type_name=message_type_name, --the type of message received
@message_body=CAST(message_body AS XML), -- the message contents
@message_sequence_number=message_sequence_number,
@dialog = conversation_handle -- the identifier of the dialog this message was received on
FROM [JobQueue]
WHERE conversation_group_id=@cgid
), TIMEOUT 3000 -- if the queue is empty for three seconds, give up and go away
-- If we didn't get anything, the queue is empty so bail out
IF (@@ROWCOUNT = 0) BEGIN
PRINT 'pr_ProcessJob::WaitFor - No messages for conversation group bailing out'
BREAK
END --IF (@@ROWCOUNT = 0)
PRINT 'Message Received: ' + @message_type_name
SAVE TRANSACTION MessageReceivedSavePoint
-- Handle the End Conversation Message
IF (@message_type_name = 'http://schemas.microsoft.com/SQL/ServiceBroker/EndDialog') BEGIN
-- When we receive an End Dialog, we need to end also.
PRINT 'ENDING CONVERSATION'
END CONVERSATION @dialog
END -- IF (@message_type_name = 'http://schemas.microsoft.com/SQL/ServiceBroker/EndDialog') BEGIN
ELSE BEGIN
-- Handle the Conversation Error Message
IF (@message_type_name = 'http://schemas.microsoft.com/SQL/ServiceBroker/Error') BEGIN
-- We can't return anything here because the dialog at the other end is closed so just log
-- an error and close our end of the conversation.
PRINT 'ENDING CONVERSATION w/Error'
END CONVERSATION @dialog
END -- (@message_type_name = 'http://schemas.microsoft.com/SQL/ServiceBroker/Error')
ELSE BEGIN
IF (@message_type_name = 'SubmitJob') BEGIN -- Process normal Job messages..
PRINT 'pr_ProcessJob:: Message Type SubmitJob received.'
-- Pull the information out of the task message with XQuery
SELECT @JobID = @message_body.value('(/Job/JobID)[1]', 'int'),
@Params = @message_body.value('(/Job/Params)[1]', 'varchar(MAX)')
PRINT 'pr_ProcessJob::@JobID = ' + cast(@jobID as varchar(10))
PRINT 'pr_ProcessJob::@Params = ' + @Params
SELECT @ErrorNumber = 0, @ErrorText = N''
-- Do something with the job
-- save state
-- we are looking for the first step
SET @Step=1
PRINT 'Selecting from JobTask'
---------------------------------------------------------
-- Get the next task
---------------------------------------------------------
SELECT TOP 1
@TaskID=task.TaskID,
@TaskService=tt.TaskService,
@TaskKey =Task.TaskKey
FROM JobTask task INNER JOIN TaskType tt
ON task.TaskTypeID = tt.TaskTypeID
WHERE task.jobID=@JobID AND task.enabled=1 and task.step>=@step
ORDER BY Task.step
---------------------------------------------------------
PRINT 'Selecting from JobTask: complete'
PRINT 'Step='+cast(@step as varchar(max))
PRINT 'TaskID='+cast(@TaskID as varchar(max))
PRINT 'TaskService='+cast(@TaskService as varchar(max))
PRINT'TaskKey='+cast(@TaskKey as varchar(max))
PRINT 'BEGIN DIALOG with ' + @TaskService
BEGIN DIALOG @chEngine
FROM SERVICE [JobService]
TO SERVICE @TaskService
ON CONTRACT [TaskContract]
WITH RELATED_CONVERSATION=@dialog;
PRINT 'BEGIN DIALOG with ' + @TaskService+' completed.'
SET @EngineMsg = CAST('<Task><TaskID>'+ str(@TaskID)+'</TaskID><TaskKey>'+ str(@Taskkey)+'</TaskKey></Task>' as XML);
PRINT CAST(@EngineMsg as varchar(max))
PRINT 'Sending Message Type SubmitTask to Engine.';
SEND ON CONVERSATION @chEngine
MESSAGE TYPE SubmitTask
(@EngineMsg)
PRINT 'Inserting into jobstate'
INSERT INTO JobState(cgid, jobch, jobID, step) VALUES(@cgid, @dialog, @jobid, @step)
END -- IF (@message_type_name = 'SubmitJob')
ELSE BEGIN
IF (@message_type_name = 'TaskResponse') BEGIN
PRINT 'Processing MessageType TaskResponse'
SELECT @TaskStatus = @message_body.value('(/TaskStatus)[1]', 'int')
PRINT 'pr_ProcessJob::@TaskStatus = ' + cast(@TaskStatus as varchar(10))
PRINT 'Loading State'
--LoadState
SELECT @JobID=jobid,
@Step=Step,
@jobch=jobch,
@TimeStarted=sysdate
FROM Jobstate
WHERE cgid=@cgid
PRINT 'Loading State complete'
PRINT @jobch
PRINT 'Selecting from JobTask'
---------------------------------------------------------
-- Get the next task
---------------------------------------------------------
SELECT TOP 1
@TaskID=task.TaskID,
@TaskService=tt.TaskService,
@TaskKey =task.TaskKey,
@NextStep = task.Step
FROM JobTask task INNER JOIN TaskType tt
ON task.TaskTypeID = tt.TaskTypeID
WHERE task.jobID=@JobID AND task.enabled=1 and task.step>@step
ORDER BY Task.step
---------------------------------------------------------
PRINT 'Selecting from JobTask: complete'
PRINT 'NextTask: ['+@TaskService+']'
if (@TaskService is null) BEGIN
PRINT '@TaskService is NULL: BEGIN'
-- no more tasks
--END CONVERSATION @jobch
PRINT 'Removing from state table'
DELETE FROM JobState
WHERE @cgid=cgid
PRINT @@ROWCOUNT
PRINT 'Removing from state table-completed'
DECLARE @ResponseDoc xml
-- Send a response message saying we're done
DECLARE @Time nvarchar(100)
SET @Time = cast(getdate() as nvarchar(100))
DECLARE @TimeStartedText nvarchar(100)
SET @TimeStartedText = cast(@TimeStarted as nvarchar(100))
SET @ResponseDoc = N'<Job/>'
SET @ResponseDoc.modify(
'insert (<JobID>{ sql:variable("@JobID") }</JobID>,
<JobStatus>{ sql:variable("@ErrorNumber") }</JobStatus>,
<ErrorNumber>{ sql:variable("@ErrorNumber") }</ErrorNumber>,
<ErrorText>{ sql:variable("@ErrorText") }</ErrorText>,
<TimeStarted>{ sql:variable("@TimeStartedText") }</TimeStarted>,
<TimeCompleted>{ sql:variable("@Time") }</TimeCompleted>)
as last into /Job [1]');
SEND ON CONVERSATION @jobch
MESSAGE TYPE [JobResponse] (@ResponseDoc)
END CONVERSATION @jobch
PRINT '@TaskService is NULL: END'
END --if (@TaskService is null) BEGIN
ELSE BEGIN
-- there are more tasks
PRINT '@TaskService is not null: BEGIN'
PRINT 'BEGIN DIALOG with ' + @TaskService
--another task
BEGIN DIALOG @chEngine
FROM SERVICE [JobService]
TO SERVICE @TaskService
ON CONTRACT [TaskContract]
WITH RELATED_CONVERSATION=@dialog;
SET @EngineMsg = CAST('<Task><TaskID>'+ str(@TaskID)+'</TaskID><TaskKey>'+ str(@Taskkey)+'</TaskKey></Task>' as XML);
PRINT 'SEND ' +cast(@EngineMsg as varchar(max));
SEND ON CONVERSATION @chEngine
MESSAGE TYPE SubmitTask (@EngineMsg)
PRINT 'SAVING State: ' +str(@step)
-- save state
Update JobState
SET step = @NextStep
FROM JobState
WHERE cgid=@cgid
PRINT '@TaskService is not null: END'
END -- ELSE (@TaskService is NOT NULL)
PRINT 'Processing MessageType TaskResponse...Complete'
END -- IF (@message_type_name = 'TaskCompleted')
END -- ELSE IF (@message_type_name <> 'JobRequest')
END -- ELSE (@message_type_name <> 'http://schemas.microsoft.com/SQL/ServiceBroker/Error')
END -- ELSE (@message_type_name <> 'http://schemas.microsoft.com/SQL/ServiceBroker/EndDialog')
END -- WHILE (1=1) BEGIN
PRINT 'COMMIT TRANSACTION'
COMMIT TRANSACTION
END TRY
BEGIN CATCH
--rollback transaction
DECLARE @ErrNum int
DECLARE @ErrMsg varchar(max)
SELECT
ERROR_NUMBER() AS ErrorNumber
,ERROR_SEVERITY() AS ErrorSeverity
,ERROR_STATE() AS ErrorState
,ERROR_PROCEDURE() AS ErrorProcedure
,ERROR_LINE() AS ErrorLine
,ERROR_MESSAGE() AS ErrorMessage;
PRINT 'pr_ProcessJob: ROLLBACK (CATCH)'
if (error_number()=1205) BEGIN
-- a deadlock occurred. We can try it again.
PRINT 'pr_ProcessJob: ROLLBACK TRANSACTION (CATCH)'
ROLLBACK TRANSACTION
--CONTINUE
END --if (error_number()=1205)
ELSE BEGIN
if (error_number()=9617) BEGIN
PRINT 'pr_ProcessJob: ROLLBACK TRANSACTION (CATCH)'
ROLLBACK TRANSACTION
END
ELSE BEGIN -- (error_number()<>9617)
-- another error occurred. The message cant be procesed sucessfully
PRINT 'pr_ProcessJob: ROLLBACK TRANSACTION to MessageReceivedSavePoint (CATCH)'
ROLLBACK TRANSACTION MessageReceivedSavePoint
END --ELSE (error_number()<>9617)
END -- if (error_number()<>1205)
END CATCH
END -- while loop
PRINT 'pr_ProcessJob: Complete'
END -- CREATE PROCEDURE [dbo].[ProcessJobProc]
If the subsequent instances of your app are able to see the message, it means only one thing: the previous instance must have rolled back the receive. At a first glance the code you provided looks OK, so I would go and look for errors in the object model you're using. If one of its methods throws an exception, the app will be terminated and the transaction that received the message automatically rolled back.
If you want only a single instance of your app running at a time, keep the Max setting at 1, otherwise it will by default run more instances concurrently to keep up with the load.
精彩评论