0

I'm trying to invoke the RegisterMessageHandler method on a SubscriptionClient instance running in Android.

After 20 seconds or so of publishing a message to a specific topic (via an automated test), I receive several timeout exceptions in my Android app that actually harbors the RegisterMessageHandler for the SubscriptionClient instance.

Output window

Here's the error:

** System.TimeoutException: ** 'timeout'

Registration

The code below instantiates a SubsciptionClient and then registers a message handler:

member x.Enable () =

    async {

        if not (isEnabled) then

            subscriptionClient <- new SubscriptionClient(connectionString, "Topic.courier-requested", "Subscription.all-messages")
            subscriptionClient.OperationTimeout <- TimeSpan.FromMinutes(1.0)

            let! rulesFound     = subscriptionClient.GetRulesAsync() |> Async.AwaitTask
            let  hasDefaultRule = rulesFound.Any(fun r -> r.Name = RuleDescription.DefaultRuleName)

            if hasDefaultRule then
                do! subscriptionClient.RemoveRuleAsync(RuleDescription.DefaultRuleName) |> Async.AwaitTask

            else
                let msgOptions = MessageHandlerOptions(fun args -> exceptionReceivedHandler(args))
                msgOptions.AutoComplete         <- false
                msgOptions.MaxAutoRenewDuration <- TimeSpan.FromMinutes(1.0)
                msgOptions.MaxConcurrentCalls   <- 1

                subscriptionClient.RegisterMessageHandler(processMessageAsync, msgOptions)

    } |> Async.StartAsTask

Message Handler

The message handler code is never executed. Instead, I observe several timeout exception alerts, as well as timeout messages in my output window.

Here's the code that I registered with the RegisterMessageHandler:

let processMessageAsync (message:Message) (_:CancellationToken) = 

    let json    = Encoding.UTF8.GetString(message.Body)
    ...

    subscriptionClient.CompleteAsync(message.SystemProperties.LockToken) |> Async.AwaitTask |> Async.RunSynchronously
    Task.CompletedTask

Android Test - Fails

The following test publishes a message that causes a timeout exception in my Android app after about 20 seconds:

[<Fact>]
let ``Publish message courier-requested message to servicebus topic``() =

    // Setup
    let connectionstring = ConfigurationManager.ConnectionStrings.["servicebus_testEnv"].ConnectionString

    // Test
    async {

        let client    = TopicClient(connectionstring, "Topic.courier-requested")
        let data      = "test_data"
        let message   = Message(Encoding.UTF8.GetBytes(data))
        let courierId = "b965f552-31a4-4644-a9c6-d86dd45314c4"
        message.Label <- sprintf "courier-id(%s)" courierId

        do! client.SendAsync(message) |> Async.AwaitTask
        do! client.CloseAsync()       |> Async.AwaitTask
    }

Console Test - Passes

I'm able to run the same subscription code in a console app successfully (without any timeouts):

[<EntryPoint>]
let main argv =

    printfn "Welcome to Subscription.Console"

    async {

        let subscriber = Subscriber(connectionString)
        do! subscriber.Listen()

    } |> Async.RunSynchronously


    Console.ReadKey() |> ignore
    0 // return an integer exit code

Here's the implementation:

type Subscriber(connectionString:string) =

    let mutable subscriptionClient : SubscriptionClient = null

    let exceptionReceivedHandler (args:ExceptionReceivedEventArgs) =
        printfn "Got an exception: %A" args.Exception
        Task.CompletedTask

    let processMessageAsync (message:Message) (_:CancellationToken) = 

        let json = Encoding.UTF8.GetString(message.Body)

        subscriptionClient.CompleteAsync(message.SystemProperties.LockToken) |> Async.AwaitTask |> Async.RunSynchronously
        Task.CompletedTask

    member x.Listen() =

        async {

            subscriptionClient <- new SubscriptionClient(connectionString, "Topic.courier-requested", "Subscription.all-messages")
            subscriptionClient.OperationTimeout <- TimeSpan.FromMinutes(1.0)

            let! rulesFound     = subscriptionClient.GetRulesAsync() |> Async.AwaitTask
            let  hasDefaultRule = rulesFound.Any(fun r -> r.Name = RuleDescription.DefaultRuleName)

            if hasDefaultRule then
                do! subscriptionClient.RemoveRuleAsync(RuleDescription.DefaultRuleName) |> Async.AwaitTask

            else

                let msgOptions = MessageHandlerOptions(fun args -> exceptionReceivedHandler(args))
                msgOptions.AutoComplete         <- false
                msgOptions.MaxAutoRenewDuration <- TimeSpan.FromMinutes(1.0)
                msgOptions.MaxConcurrentCalls   <- 1

                subscriptionClient.RegisterMessageHandler(processMessageAsync, msgOptions)
        }

Appendix

I applied the following to my connection string:

TransportType=AmqpWebSockets;

I have referenced the following links:

https://github.com/Azure/azure-service-bus-dotnet/issues/529

Azure Service Bus message lock not being renewed?

John Lord
  • 1,941
  • 12
  • 27
Scott Nimrod
  • 11,206
  • 11
  • 54
  • 118

1 Answers1

1

The Android Emulator wasn't connected to the internet.

Scott Nimrod
  • 11,206
  • 11
  • 54
  • 118