Como criar um Kafka consumer

Uma vez que a mensagem esteja disponível no broker, é preciso que outras aplicações consumam o seu conteúdo e façam o devido processamento. Existem várias soluções já prontas, mas desta vez, finalmente vamos descobrir como criar um Kafka consumer.

Disclaimer

Como você vai ver a seguir, construir um consumer não envolve muita complexidade de código. Entretanto, existem importantes decisões arquiteturais e de design da aplicação que precisam ser tomadas antes da primeira linha de código. Como por exemplo: o que fazer quando o processamento falhar? A resposta vai depender do ambiente em que está alocada a sua aplicação (se gerenciado ou não, por exemplo) e cada escolha vai refletir diretamente nas opções de configuração do seu consumer e do próprio Kafka.

Por isso você precisa se perguntar sempre se realmente precisa de um consumer. Existem soluções prontas, de mercado, que podem salvar bastante do seu tempo. Se a tarefa do seu consumer é apenas de replicação de dados, com alguma serialização, soluções como o Debezium podem resolver o seu problema. Como uma pessoa sábia já disse: “Menos código, menos bug”. Agora, se o processamento da mensagem vai envolver regras de negócio, interação com outros sistemas, neste caso a escrita de um consumer pode realmente ser a melhor opção.

Qual o tipo de aplicação?

Na prática, você pode escrever o seu consumer utilizando qualquer um dos templates básicos do C#: Console, Worker ou API. Cada um deles oferece prós e contras, como é de se esperar.

Um console, por exemplo, te permite escrever uma aplicação bastante leve e direto ao ponto. Tanto que acaba sendo o modelo de aplicação preferido para construção de exemplos. Essa simplicidade, porém, tem um custo. Todo o boilerplate que te permite escrever códigos limpos e manuteníveis, terá de ser escrito.

As possibilidades que nos restam é o Worker e o API. O que torna essas duas opções viáveis é o uso de HostedServices. Como você sabe, HostedServices são serviços que rodam paralelamente a thread principal do sistema. Assim você não bloqueia o sistema enquanto executa determinadas tarefas. A sua utilização é muito simples: Basta que você implemente uma classe que descenda de BackgroundService e a adicione como HostedService no injetor de dependências.

public class MyParallelWorker : BackgroundService
{
     protected override async Task ExecuteAsync(CancellationToken stoppingToken)
     {
         while (!stoppingToken.IsCancellationRequested)
         {
             try
             {
                 await MyJob();
             }
             catch (OperationCanceledException)
             {
                 return;
             }
         }
         
         Console.WriteLine("Closing application");
     }

     private async Task MyJob()
     {
         Console.WriteLine("Working in parallel");
         await Task.Delay(1000);
     }
}

...

services
     .AddHostedService<MyParallelWorker>()

Isso torna tanto o Worker quanto a API muito próximas conceitualmente. Qual seria a principal diferença, portanto? Acredito que nos requisitos não funcionais. Digamos que sua aplicação está rodando no Kubernetes e você deseja fazer leituras da saúde da aplicação, ou implementar mecanismos de curto-circuito a partir do ambiente e não da aplicação. Nesse caso, disponibilizar um endpoint para leituras de health check pode ser interessante. Embora fazer isso com aplicações worker seja possível, acredite em mim que é muito mais fácil e rápido você simplesmente abrir um templates de WebAPI e implementar as health checks necessárias.

Resumindo:

  • Console: para aplicações pequenas, pocs, exemplos e afins;
  • Worker: aplicações de qualquer tamanho e complexidade, que não precisam ser acessadas externamente via HTTP;
  • API: aplicações de qualquer tamanho e complexidade, que precisam ser acessadas externamente via HTTP;

Os exemplos que veremos a seguir utilizaram o templates Worker.

Implementando o Kafka Consumer – Worker

A classe Worker é o ponto de ignição da sua aplicação. Nela ficará clara a decisão tomada em caso de falha: Tentar manter a aplicação rodando, criando um novo escopo ou simplesmente falhar toda a aplicação e deixar que o ambiente a reinicie. A opção que tomamos aqui foi a de justamente falhar o mais rápido possível.

protected override async Task ExecuteAsync(CancellationToken stoppingToken)
{
    stoppingToken.Register(TerminateApplication);

    var token = BuildToken();

    using var scope = _provider.CreateScope();
    var receiver = scope.ServiceProvider.GetRequiredService();
    var handler = scope.ServiceProvider.GetRequiredService();

    _logger.LogInformation("Worker running at: {time}", DateTimeOffset.Now);

    while (!token.IsCancellationRequested)
    {
        try
        {
            // await receiver.Execute(handler.Handle, token);
            await Task.Delay(1000, token);
        }
        catch (OperationCanceledException)
        {
            _cts.Cancel();
            break;
        }
        catch (Exception exception)
        {
            _logger.LogInformation(exception, exception.Message);
            _cts.Cancel();
            break;
        }
    }

    TerminateApplication();
}

Escopo

Como você pode ver, no início do método ExecuteAsync nós criamos um escopo. Por qual motivo?

Os HostedServices são adicionados no injetor de dependência como singletons. E como você sabe, não é possível adicionar objetos scoped para uma dependência singleton. Uma alternativa seria, por tanto, registrar todas as dependências como singleton ou transient.

Particularmente não gosto dessa alternativa. Me parece mais óbvio, didático e flexível configurar as dependências tendo em vista o ciclo de vida em um processamento. A analogia que faço é que o processamento em questão é de uma lista de mensagens (ou uma fila, se preferir). Para processar essa lista, preciso desses componentes, interagindo com estes ciclos de vida. Desta forma, caso eu opte por modificar a minha estratégia de resiliência, basta mudar em apenas um lugar e tudo continuará funcionando.

E quando digo didático, é que sempre haverá alguém lendo o seu código; alguém que talvez nunca tenha trabalhado com Kafka. De forma que, a maneira como o código está escrito, pode ensiná-la as melhores práticas para projetos futuros e com características diversas do seu.

Garanta que a aplicação vai terminar

Você pode imaginar que, em caso de falha no BackgroundService, a aplicação morreria junto com ele. A verdade é que não. Com ou sem o lançamento de uma exceção, quando o processamento for finalizado (se não houver um controle que o mantenha infinito), a aplicação continuará rodando mesmo sem executar serviço algum.

Por este motivo, como você pôde ver no código, há o registro de um método no stoppingToken e um novo token é criado para ser enviado nas próximas chamadas. O método TerminateApplication faz uma chamada a IHostApplicationLifetime.StopApplication(), enviando um sinal de encerramento para todas as thread, possibilitando que a aplicação seja “graciosamente” encerrada. Com isso é possível garantir que a aplicação não se transformará num zumbi.

Uma palavra sobre configuração

Não serão poucas as vezes em que você verá alguém mapeando as configurações necessárias para o Kafka para uma classe menor. Algo como:

public class ConsumerConfiguration 
{
    public string Topic { get; init; }
    public string BootstrapServer { get; init; }
}

Funciona. Eu mesmo já fiz muito isso. No entanto, algo que a experiência me trouxe, é que essas você pode precisar de mais configurações além das previstas no início da codificação. O que evidentemente necessitaria de uma recompilação do projeto. Recompilar um projeto por conta de configurações não me parece uma boa ideia.

Caso você queira ter maior poder e opções de configuração, sem a necessidade de recompilar a aplicação, você pode fazer algo como:

public class ConsumerConfiguration : ConsumerConfig
{
    private const int DefaultProcessingTimeoutMinutes = 1;
    private const int DefaultHeartbeatTimeoutSeconds = 10;

    public ConsumerConfiguration()
    {
        MaxPollIntervalMs = (int)TimeSpan.FromMinutes(DefaultProcessingTimeoutMinutes).TotalMilliseconds;
        SessionTimeoutMs = (int)TimeSpan.FromSeconds(DefaultHeartbeatTimeoutSeconds).TotalMilliseconds;
        GroupId = Guid.NewGuid().ToString();
    }

    public string Topic { get; init; }
}

Uma vez que tenha herdado da classe original, posso adicionar os valores padrões que eu desejar para a aplicação. No exemplo, temos a configuração padrão das propriedades de MaxPoll e SessionTimeout. Caso o usuário não informe esses valores no appsettings.json, os valores padrão serão assumidos.

Uma linha que está no código de exemplo e que você NUNCA DEVE FAZER NA VIDA REAL: setar o GroupId como uma GUID. Se esse código for para produção, você pode ter problemas com reprocessamento de mensagens. Se você deseja apenas facilitar o teste de mesa, envolva essa inicialização, validando o ambiente em que a aplicação está rodando.

Perceba também que eu adicionei o campo Topic, que não está originalmente nas classes de configuração. Isso não altera em nada o comportamento quando crio o meu consumer. E de quebra, ainda tenho o nome do tópico acessível. Prefiro deixar informações como o nome do tópico configuráveis para poder simular erros durante a etapa de testes.

Kafka Consumer Provider

Da mesma forma como fizemos com o Producer, também vamos criar um provider de consumers. O princípio é basicamente o mesmo: Encapsular complexidade de criação dos objetos, garantir que não teremos reconexões desnecessárias e atrelar o ciclo de vida do consumer ao provider. Uma estratégia que vimos funcionar muito bem no producer.

internal class ConsumerProvider<TKey, TValue> : IConsumerProvider<TKey, TValue>
{
    private readonly IDeserializer<TKey> _keyDeserializer;
    private readonly IDeserializer<TValue> _valueDeserializer;
    private readonly ConsumerConfiguration _configuration;
    private readonly ILogger<ConsumerProvider<TKey, TValue>> _logger;
    private IConsumer<TKey, TValue> _consumer;
    private bool _disposedValue;

    public ConsumerProvider(
        IDeserializer<TKey> keyDeserializer,
        IDeserializer<TValue> valueDeserializer,
        ConsumerConfiguration configuration,
        ILogger<ConsumerProvider<TKey, TValue>> logger)
    {
        _keyDeserializer = keyDeserializer;
        _valueDeserializer = valueDeserializer;
        _configuration = configuration;
        _logger = logger;
    }

    public IConsumer<TKey, TValue> GetConsumer() =>
        _consumer ?? BuildConsumer();

    public void Dispose()
    {
        Dispose(disposing: true);
        System.GC.SuppressFinalize(this);
    }

    protected virtual void Dispose(bool disposing)
    {
        if (!_disposedValue)
        {
            if (disposing)
            {
                _consumer?.Dispose();
            }

            _disposedValue = true;
        }
    }

    private IConsumer<TKey, TValue> BuildConsumer()
    {
        if (_consumer is not null)
        {
            return _consumer;
        }

        _consumer = new ConsumerBuilder<TKey, TValue>(_configuration)
            .SetErrorHandler((_, error) => _logger.LogError(error.Reason))
            .SetLogHandler((_, logMessage) => _logger.LogInformation(logMessage.Message))
            .SetKeyDeserializer(_keyDeserializer)
            .SetValueDeserializer(_valueDeserializer)
            .Build();
        _consumer.Subscribe(_configuration.Topic);

        return _consumer;
    }
}
services
    .AddScoped(typeof(IConsumerProvider<,>), typeof(ConsumerProvider<,>))

ConsumerBuilder e desserializadores

Quero chamar a sua atenção para algo que também é possível de ser feito no Producer, mas que fiz apenas no Consumer para fins ilustrativos. Veja o trecho de código:

_consumer = new ConsumerBuilder<TKey, TValue>(_configuration)
    .SetErrorHandler((_, error) => _logger.LogError(error.Reason))
    .SetLogHandler((_, logMessage) => _logger.LogInformation(logMessage.Message))
    .SetKeyDeserializer(_keyDeserializer)
    .SetValueDeserializer(_valueDeserializer)
    .Build();
_consumer.Subscribe(_configuration.Topic);

Como pode ver, no ConsumerBuilder nós especificamos manipuladores para casos de erro e log do Kafka. Estamos apenas logando informações. Você poderia fazer algo mais complexo. Mas eu quero chamar a atenção para as duas novidades: SetKeyDeserializer e SetValueDeserializer.

A maior parte do código de exemplo disponível na internet consome a mensagem apenas como string. Isto não está errado. Todavia, há outra forma disponível para serialização/desserialização de objetos. No caso do consumer, basta definir os desserializadores para a chave o valor. Desta forma o método .Consume() não irá retornar uma string, mas sim o objeto já desserializado. Mas como escrever essas classes?

Exemplo de implementação para IDeserializer<>

Para construir de/serializadores para o Kafka é relativamente simples. Basta que você defina uma classe, implementando a interface IDeserializer<T>, onde o T é o tipo que você deseja retornar. Por exemplo:

public class SaleEventDeserializer : IDeserializer<SaleEvent>
{
    public SaleEvent Deserialize(ReadOnlySpan<byte> data, bool isNull, SerializationContext context)
    {
        if (isNull)
        {
            return null;
        }

        return JsonSerializer.Deserialize<SaleEvent>(data, new JsonSerializerOptions
        {
            PropertyNameCaseInsensitive = true,
        });
    }
}
...
services
    .AddTransient(_ => Deserializers.Null)
    .AddTransient<IDeserializer<SaleEvent>, SaleEventDeserializer>()

Para o nosso caso, nós estamos desserializando o objeto SaleEvent. O parâmetro data, como é possível inferir, contém os bytes da mensagem ou da chave. O método JsonSerializer.Deserialize<> já possui uma implementação que aceita um ReadOnlySpan<byte> e por isso não é necessário nenhuma transformação no tipo. Caso você utilize a biblioteca Newtonsoft, será necessária uma chamada para System.Text.Encoding.UTF8.GetString(data) afim de obter a string.

Caso você encontre um erro na desserialização – o objeto json estiver mal formatado, por exemplo – o método Consume irá levantar uma exceção. Manipule corretamente esse tipo de erro, fazendo as manipulações necessárias (como enviar para uma deadletter queue/topic).

Consumindo as mensagens

Nós já discutimos bastante sobre o comportamento de consumers, o que nos permite ir mais rapidamente para o código:

public interface IEventReceiver
 {
     Task Execute(
         Func<IDictionary<string, string>, SaleEvent, Task> messageHandler,
         CancellationToken token = default);
 }

A interface IEventReceiver está sendo chamada lá na classe Worker, dando início ao processamento. Perceba que além do CancellationToken, ela também recebe uma função para manipular a mensagem. Desta forma, sem muitos malabarismos, é possível isolar a manipulação da mensagem em si da tratativa de comunicação com o Kafka. Veja a implementação do método IEventReceiver.Execute:

public async Task Execute(
    Func<IDictionary<string, string>, SaleEvent, Task> messageHandler,
    CancellationToken token = default)
{
    var consumerResult = await Task.Run(() => _consumer.Consume(token));

    await messageHandler(
        consumerResult.Message.Headers.ToDictionary(),
        consumerResult.Message.Value);

    _consumer.StoreOffset(consumerResult);
    _consumer.Commit();
}

Na classe Worker, é solicitada uma instância de ISaleEventHandler, cuja finalidade é ser o ponto de entrada para o processamento de domínio. Vejamos a implementação de ISaleEventHandler.Handle:

public Task Handle(IDictionary headers, SaleEvent message)
 {
     Console.WriteLine($"Sale {message.Id} processed");
     return Task.CompletedTask;
 }

Assim eu consigo garantir uma independência entre os detalhes de implementação e as regras de negócio ao distanciar o domínio do meu código de infraestrutura. Obviamente isso vai requerer uma certa maturidade no design da aplicação – em que lugar os erros serão manipulados, por exemplo – mas esse é um papo para outro momento.

Após ler esse código, no que diz respeito ao consumo das mensagens, você deve estar percebendo que poderia resolver várias coisas utilizando apenas generics, tornado suas classes mais reaproveitáveis, já que a maior parte do algoritmo será idêntico para qualquer tipo de mensagem. Isso é uma verdade. Eu apenas não fui por esse caminho para ser mais objetivo e diminuir um pouco a complexidade da nossa conversa.

Você acha que acabou? Ainda temos muitos assuntos que eu adoraria discutir com você sobre Kafka. Então fique ligado para os próximos capítulos dessa conversa.

https://github.com/ftathiago/blogdoft-toycode/tree/feature/CreateKafkaConsumer

Link para a aplicação

Deixe um comentário

O seu endereço de e-mail não será publicado. Campos obrigatórios são marcados com *

Esse site utiliza o Akismet para reduzir spam. Aprenda como seus dados de comentários são processados.