Kalix app building.

Kalix tutorial: Building invoice application

Kalix app building.

Scala is well-known for its great functional scala libraries which enable the building of complex applications designed for streaming data or providing reliable solutions with effect systems. However, there are not that many solutions which we could call frameworks to provide every necessary tool and out-of-the box integrations with databases, message brokers, etc.

In 2022, Kalix was introduced by Lightbend. It is a framework designed to build reactive systems with a platform, which frees developers from the complexity of the system infrastructure such as database, distributed cache and message brokers. Using it, we can receive multi language application frameworks, runtime platforms and command line interface. This mix allows the fast building of distributed systems using Domain Driven Design, where the developer should first concentrate on designing the system and describe it with protobuf definitions. Out of those definitions, Kalix will generate most of the code which will then need to be glued together. Production deployment is as simple as running a few CLI commands. Besides that, deployment is fast and easy, and services are also backed up with out of the box monitoring and backups. Given that Kalix is message oriented, it has components which give Command Query Responsibility Segregation and Event Sourcing for free. The latest feature of the framework has also introduced a Workflow component to handle SAGA patterns with ease.

This article will build a simple invoice application using Kalix, to check out its features and concepts.

Kalix tutorial: What do we want to achieve?

Our goal is to build a simple application allowing users to manage invoices in a collaborative way. The functionalities we would like to have in the application are: 

  • Creation of invoices;
  • Deletion of invoices;
  • Editing of invoices;
  • A free way to change invoice numbers;
  • Getting invoice details;
  • Listing existing invoices;
  • Auditing invoice changes.

Ideally, invoices should be organized around an organization. For simplicity, let’s assume that we have only one organization.

Project Overview

Conceptually, the system and the components should look as follows:

At first, clients will connect to the invoice controller through HTTP and gRPC endpoints. The invoice controller action will be the only publicly visible API. Internal communication is going to happen only through the internal gRPC API’s. The invoice controller will validate messages and be responsible for communication with other components used in the system, such as: the Invoice Entity (event sourced entity), the Invoice Number Entity (value entity) and views. The changes emitted by invoice entities will be used to build corresponding views and to trigger “workflow” actions.The Invoice Number Entity is a special component that will handle the uniqueness of invoice numbers within the system.

Desinging invoice:

Lets provide our aggregate for the invoice first. We know that we need to have audit possibilities for invoice changes so the event source entity will be a perfect fit for it.

Let’s say that the invoice numbers should be unique across our organization, even though our case is simpler. The natural choice would be the composed key for the invoice number and organization. Let’s look back at what we want to achieve.

Unfortunately, if the user isn’t happy with the invoice number for some reason, then we will need to destroy the entity and recreate it. This isn’t perfect because other users will lose transparency around accessing a single identifier to edit documents. Additionally, a newly created invoice would need to choose between copying all the previous events or to use a current state of invoice, in the end losing some information and having other implications. That is why usage of synthetic identifiers for the invoice would seem a better choice but still we would need to be aware of the uniqueness for the invoice number.

Let’s also define some less constrained elements of our aggregate, such as: 

  • customer details (name, tax registration number and address);
  • invoice issuance date;
  • line items (product id, details, quantity, price, vat rate);
  • invoice status.

Initializing the project

A quickstart to the project is simple. In your favorite terminal emulator call sbt new lightbend/kalix-value-entity.g8. The command will create a base Kalix project, which we will use as our invoice example project.

The structure of the project is pretty straightforward. Under location /src/main/protobuf you will find all of the protobuf definitions. All of the necessary scala implementations will be generated at the /src/main/scala location. For our example, it would be best to remove the Main.scala file each time a new component is added. No worries, Kalix will regenerate it for us. Worth noting is that the protobuf packages will be reflected the same way in Scala code.

Into the protobuf definition

Now that we’ve addressed all of the necessary aspects, we’ll transfer the definitions to the protobuf declaration files. We have in mind that the pillars of Kalix are Domain Driven Design and messaging, so the domain model is the most important for us now. We could go straight to code, but that would require more costs such as custom encoders and so on. Thanks to protobuf, we can describe our system in a declarative way and Kalix will generate everything that is needed.

So let’s describe the value object in the protobuf file located at ./src/main/protobuf/io/scalac/invoice/domain/model.proto. This file will contain standard protobuf 3 syntax with message declarations. We will keep our Value Objects in a separate file as we’ll reach for them later on in various components in service.

syntax = "proto3";
package io.scalac.invoice.domain;

message LineItem {
  string product_id = 1;
  string description = 2;
  int32 quantity = 3;
  double net_price = 4;
  int32 vat_rate = 5;
}

message Address {
  string street = 1;
  string city = 2;
  string state = 3;
  string zip = 4;
}

message CompanyDetails {
  string name = 1;
  string tax_registration_number = 2;
  Address customer_address = 3;
}

enum InvoiceStatus {
  INVOICE_STATUS_UNSPECIFIED = 0;
  INVOICE_STATUS_OPEN = 1;
  INVOICE_STATUS_PAID = 2;
}}

Now the Value Objects are defined, but that is just the beginning. We need to define a state which can be seen as an Aggregate Object. It will be wrapped around and changed by the Entity Service. Let’s put the state under a different file at location ./src/main/protobuf/io/scalac/invoice/entity/state/invoice_state.proto. We have changed the package to strongly indicate that this is very much related to our entity.

syntax = "proto3";

package io.scalac.invoice.entity.state;

import "io/scalac/invoice/domain/model.proto";

message InvoiceState {
  // customer data number across organization. Change of invoice number
  // requires creating invoice number entity to enforce its uniqueness
  string invoice_number = 1;

  // customer data
  domain.CompanyDetails customer = 2;
  
  // issuance date
  string date = 3;

  // items on invoice
  repeated domain.LineItem line_items = 4;

  // current invoice status
  domain.InvoiceStatus status = 5;
}

Please note that the state here does not need to have an identifier, it is provided or resolved from commands by the Kalix Entity Service that we will create later on.

Events defining state

We have defined the state that we will operate using the Kalix Value Entity, but one of the requirements is an audit log of changes. Then we need to change to the Event Sourced Entity. We have switched to Event Sourced Entity, but we are lacking events. Again, create a new file again under domain package at: ./src/main/protobuf/io/scalac/invoice/domain/events.proto where we will put all of the events that will be emitted by our entity in order to create the state at demand.

syntax = "proto3";
package io.scalac.invoice.domain;

message LineItem {
  string product_id = 1;
  string description = 2;
  int32 quantity = 3;
  double net_price = 4;
  int32 vat_rate = 5;
}

message Address {
  string street = 1;
  string city = 2;
  string state = 3;
  string zip = 4;
}

message CompanyDetails {
  string name = 1;
  string tax_registration_number = 2;
  Address customer_address = 3;
}

enum InvoiceStatus {
  INVOICE_STATUS_UNSPECIFIED = 0;
  INVOICE_STATUS_OPEN = 1;
  INVOICE_STATUS_PAID = 2;
}

We do not need an id as part of the messages, because Kalix adds metadata information to the events and using that, it knows which entity they should be applied to.

Commands and entity service

In Kalix we can think about the Event Sourced Entity as a service with state, which accepts commands, processes them and emits events. Well commands are actions that will change the state of an entity. Through commands we define the API of communication with entities. For the invoice service creation of an invoice, it needs to happen through a specific create command, where in the case it succeeds the state with the new generated id is returned. Adding and removing line items, deleting an invoice, changing the invoice number, changing customer data and changing the issuance date also requires a specific command.

Command definition

Placing the command messages and service in one protobuf file is handy, because they are very strongly related and used through service methods. Entity location should be ./src/main/protobuf/io/scalac/invoice/entity/invoice_service.proto but first we should concentrate a bit on command definitions. They only differ to standard protobuf messages with the annotation (kalix.field).id = true. This annotation is imported from Kalix definitions. This particular annotation tells the code generator that this field will be used to find/generate the entity id. There are some restrictions around defining ids. It must have a type of string and can only be defined once per message.

syntax = "proto3";

package io.scalac.invoice.entity;

import "google/protobuf/empty.proto";
import "io/scalac/invoice/domain/model.proto";
import "io/scalac/invoice/views/v1/model.proto";
import "kalix/annotations.proto";

message CreateCommand {
  string user_id = 1;
}

message AddLineItemCommand {
  string invoice_id = 1 [(kalix.field).id = true];
  string user_id = 2;
  domain.LineItem line_item = 3;
}

message ChangeCustomerDetailsCommand {
  string invoice_id = 1 [(kalix.field).id = true];
  string user_id = 2;
  domain.CompanyDetails details = 3;
}

message ChangeDateCommand {
  string invoice_id = 1 [(kalix.field).id = true];
  string user_id = 2;
  string date = 3;
}

message RemoveLineItemCommand {
  string invoice_id = 1 [(kalix.field).id = true];
  string user_id = 2;
  string product_id = 3;
}

message ChangeInvoiceNumberCommand {
  string invoice_id = 1 [(kalix.field).id = true];
  string user_id = 2;
  string invoice_number = 3;
}

message AddPaymentCommand {
  string invoice_id = 1 [(kalix.field).id = true];
  string user_id = 2;
}

message DeleteCommand {
  string invoice_id = 1 [(kalix.field).id = true];
  string user_id = 2;
}

message GetCommand {
  string invoice_id = 1 [(kalix.field).id = true];
}
// service ...

Service definition

In the same file the service is located. Service must be annotated with kalix.codegen to tell which generator should be used. For our case, it is an event sourced entity. We need to specify the full class name of the service, the unique name of the entity, the state and events that will be used to define the state.

The methods defined after codegen annotation define the gRPC API for the communication. They accept command messages which we defined before as its arguments and usually return an empty response.

During the design decision, we pointed out that we want to use synthetic identifiers for the entities. For us, the UUID seems to be the perfect choice and we need to instruct Kalix to generate it for us. If we would like to generate a different key, then we should call CreateCommand again.

Notice that the methods Get(GetCommand) and Create(CreateCommand) as a result return the current state details (we can think of it as the Transfer Object) which we will define later on. We could return the internal state from the entity, but any change to the state will result in changes in all other places which depend on the entity component.

syntax = "proto3";

package io.scalac.invoice.entity;

import "google/protobuf/empty.proto";
import "io/scalac/invoice/domain/model.proto";
import "io/scalac/invoice/action/public/v1/model.proto";
import "kalix/annotations.proto";

// commands ...

service InvoiceService {
  option (kalix.codegen) = {
    event_sourced_entity: {
      name: "io.scalac.invoice.entity.InvoiceEntityService"
      type_id: "invoice"
      state: "io.scalac.invoice.entity.state.InvoiceState"
      events: [
        "io.scalac.invoice.domain.Created",
        "io.scalac.invoice.domain.LineItemAdded",
        "io.scalac.invoice.domain.LineItemRemoved",
        "io.scalac.invoice.domain.InvoiceNumberChanged",
        "io.scalac.invoice.domain.CustomerDetailsChanged",
        "io.scalac.invoice.domain.DateChanged",
        "io.scalac.invoice.domain.Paid",
        "io.scalac.invoice.domain.Deleted"
      ]
    }
  };

  rpc Create(CreateCommand) returns (action.public.v1.InvoiceDetails) {
    option (kalix.method).id_generator.algorithm = VERSION_4_UUID;
  }

  rpc AddLineItem(AddLineItemCommand) returns (google.protobuf.Empty) {}
  rpc RemoveLineItem(RemoveLineItemCommand) returns (google.protobuf.Empty) {}
  rpc ChangeDate(ChangeDateCommand) returns (google.protobuf.Empty) {}
  rpc ChangeCustomer(ChangeCustomerDetailsCommand) returns (google.protobuf.Empty) {}
  rpc ChangeInvoiceNumber(ChangeInvoiceNumberCommand) returns (google.protobuf.Empty) {}
  rpc AddPayment(AddPaymentCommand) returns (google.protobuf.Empty) {}

  rpc Delete(DeleteCommand) returns (google.protobuf.Empty) {}

  rpc Get(GetCommand) returns (action.public.v1.InvoiceDetails) {}
}

As a side note, it is worth mentioning that Kalix adds some limitations to the entity service. It can’t call other parts of the system. Therefore, it is not good to expose it to the outside world, even though we could do that.

Implementing invoice entity behavior

It now looks like we have all of the definitions we need. We can now run sbt compile to generate all of the necessary components. Kalix will generate everything for us. Now our part is to connect the dots to make our application work.

Let’s start with the entity service. Which should be generated at: ./src/main/scala/io/scalac/invoice/entity/InvoiceEntityService.scala. Besides the methods defined in the protobuf declaration, Kalix has generated the methods to handle state changes. These directly react to the events defined in the generator.

class InvoiceEntityService(context: EventSourcedEntityContext) extends AbstractInvoiceEntityService {
  // initial state
  override def emptyState: InvoiceState = InvoiceState.defaultInstance

  // command handling
  override def create(currentState: InvoiceState, createCommand: CreateCommand): EventSourcedEntity.Effect[InvoiceDetails] = ???
  override def addLineItem(currentState: InvoiceState, addLineItemCommand: AddLineItemCommand): EventSourcedEntity.Effect[Empty] = ???
  override def changeDate(currentState: InvoiceState, changeDateCommand: ChangeDateCommand): EventSourcedEntity.Effect[Empty] = ???
  override def changeCustomer(currentState: InvoiceState, changeCustomerDetailsCommand: ChangeCustomerDetailsCommand): EventSourcedEntity.Effect[Empty] = ???
  override def removeLineItem(currentState: InvoiceState, removeLineItemCommand: RemoveLineItemCommand): EventSourcedEntity.Effect[Empty] = ???
  override def changeInvoiceNumber(currentState: InvoiceState, changeInvoiceNumberCommand: ChangeInvoiceNumberCommand): EventSourcedEntity.Effect[Empty] = ???
  override def addPayment(currentState: InvoiceState, addPaymentCommand: AddPaymentCommand): EventSourcedEntity.Effect[Empty] = ???
  override def delete(currentState: InvoiceState, deleteCommand: DeleteCommand): EventSourcedEntity.Effect[Empty] = ???
 
  // retrieving state
  override def get(currentState: InvoiceState, getCommand: GetCommand): EventSourcedEntity.Effect[InvoiceDetails] = ???

  // state change events handling
  override def created(currentState: InvoiceState, created: Created): InvoiceState = ???
  override def lineItemAdded(currentState: InvoiceState, lineItemAdded: LineItemAdded): InvoiceState = ???
  override def customerDetailsChanged(currentState: InvoiceState, customerDetailsChanged: CustomerDetailsChanged): InvoiceState = ???
  override def dateChanged(currentState: InvoiceState, dateChanged: DateChanged): InvoiceState = ???
  override def lineItemRemoved(currentState: InvoiceState, lineItemRemoved: LineItemRemoved): InvoiceState = ???
  override def invoiceNumberChanged(currentState: InvoiceState, invoiceNumberChanged: InvoiceNumberChanged): InvoiceState = ???
  override def paid(currentState: InvoiceState, paid: Paid): InvoiceState = ???
  override def deleted(currentState: InvoiceState, deleted: Deleted): InvoiceState = ???

Implementation is dealing with state, this means we have to have a starting point for it. Protobuf messages by default have all of the fields “zeroed”. This means that the fields of type string, int, double are defaulted to “”, 0 and 0.0 accordingly. Other composed messages are wrapped with Option monad and user None value. This means that we could use such a default instance for an entity.

override def emptyState: InvoiceState = InvoiceState.defaultInstance

Creation processing

Moving on, let’s create a command handling method. Firstly, we need to check if the state hasn’t changed yet. We can do that by comparing the current state with the emptyState method, and if the state has not changed, then we need to return an error effect. To do that, we need to call the effect builder to provide the error message with optional io.grpc.Status.Code.

If the state is empty, then we can emit io.scalac.domain.invoice.Created event. This is pretty straight forward, again we simply call effect builder with the event object, but also the reply object expected is the io.scalac.invoice.action.public.v1.InvoiceDetails where the actual identifier is provided in the invoiceId field from the event sourced entity context. It has been generated for us at this moment, because we had set it in the method configuration beforehand.

  override def create(currentState: InvoiceState, createCommand: CreateCommand): EventSourcedEntity.Effect[InvoiceDetails] =
    if (currentState != emptyState)
      effects.error("Invoice already exists!", Status.Code.ALREADY_EXISTS)
    else
      effects
        .emitEvent(Created(userId = createCommand.userId))
        .thenReply(_.toDetailsView(context.entityId))

Let’s handle the emitted event. This is even simpler. Given that it has already been accepted, we just need to set the state to Open.

  override def created(currentState: InvoiceState, created: Created): InvoiceState =
    InvoiceState(status = InvoiceStatus.INVOICE_STATUS_OPEN)

Update invoice processing

Maybe a more interesting example would be to add a new line item to the invoice. Before we emit the update event, some things must be verified: 

  • State needs to be NON empty for a given identifier. If the state in this case is non empty (it will be so if CreateCommand was called before), then we can proceed. 
  • The state of the entity must be correct. For this case we can expect that the status of the invoice is Open.
  • The line item in the command should be defined. If not, let’s respond with error. 
  • When a line item is provided, we should finally produce the LineItemAdded event.

For better readability we added some private utility methods which check the current state and cancel processing once the predicates are not met. They can be seen in the repository example .

  override def addLineItem(currentState: InvoiceState, addLineItemCommand: AddLineItemCommand): EventSourcedEntity.Effect[Empty] =
    forNonEmptyState(currentState) {
      forOpen(currentState) {
        addLineItemCommand.lineItem match {
          case None => effects.error("No item given")
          case Some(lineItem) =>
            effects
              .emitEvent(
                LineItemAdded(
                  userId = addLineItemCommand.userId,
                  item   = Some(lineItem)
                )
              )
              .thenReply(_ => Empty())
        }
      }
    }

Reaction to the event would be state change, where we will update the state by adding/replacing the line item with productId.

  override def lineItemAdded(currentState: InvoiceState, lineItemAdded: LineItemAdded): InvoiceState = {
    val lineItems = currentState.lineItems.map(item => (item.productId, item)).toMap
    val updated   = lineItems.updated(lineItemAdded.getItem.productId, lineItemAdded.getItem)
    currentState.copy(lineItems = SortedSet.from(updated.view.values).toSeq)
  }

Deletion of an invoice

Deletion of an entity is also very simple. Again, we check the constraints and emit entity events, but also we need to inform an entity that it is going to be deleted.

When deleting an entity, we have to have a few things in mind. It will not happen immediately, Kalix will keep the events and the state for around one week. This is due to the fact that other parts of the system might need to still work on the emitted events. The updates are no longer allowed after this point but the state of the entity can still be retrieved.

  override def delete(currentState: InvoiceState, deleteCommand: DeleteCommand): EventSourcedEntity.Effect[Empty] =
    forNonEmptyState(currentState) {
      effects
        .emitEvent(
          Deleted(
            userId        = deleteCommand.userId,
            invoiceNumber = currentState.invoiceNumber
          )
        )
        .deleteEntity()
        .thenReply(_ => Empty())

    }

Logic for the controller

We have already shownwed some examples about how to work with the application state. Someone may think that “well, it should be fine to expose the entity itself to the outside world”. Unfortunately, we are still only half way done. We still have not covered how to handle invoice number uniqueness and how to validate inputs and how to display the invoices and invoice history in the system. 

Controller definition

Let’s then define the controller, the window through which users can communicate with our system. It could be used also to communicate with other services within the same Kalix Runtime. Let’s put its definition in ./src/main/protobuf/io/scalac/invoice/action/public/v1/invoice_api.proto. Kalix allows us to define RPC and REST API. RPC we get out of the box. For the REST API we need to use Google API annotations.

syntax = "proto3";

package io.scalac.invoice.action.public.v1;

import "google/api/annotations.proto";
import "google/protobuf/empty.proto";
import "io/scalac/invoice/domain/model.proto";
import "io/scalac/invoice/view/v1/invoice_history.proto";
import "io/scalac/invoice/action/public/v1/model.proto";
import "kalix/annotations.proto";

message GetDetailsRequest {
  string invoice_id = 1;
  string user_id = 2;
}

message GetHistoryRequest {
  string invoice_id = 1;
  string user_id = 2;
}

message GetListRequest {
  string user_id = 1;
  string page_token = 2;
}

message CreateRequest {
  string user_id = 1;
}

message AddLineItemRequest {
  string invoice_id = 1;
  string user_id = 2;

  domain.LineItem line_item = 3;
}

message RemoveLineItemRequest {
  string invoice_id = 1;
  string user_id = 2;

  string product_id = 3;
}

message ChangeInvoiceNumberRequest {
  string invoice_id = 1;
  string user_id = 2;

  string invoice_number = 3;
}

message ChangeCustomerDetailsRequest {
  string invoice_id = 1;
  string user_id = 2;

  domain.CompanyDetails customer = 3;
}

message ChangeDateRequest {
  string invoice_id = 1;
  string user_id = 2;
  string date = 3;
}

message PayRequest {
  string invoice_id = 1;
  string user_id = 2;
}

message DeleteRequest {
  string invoice_id = 1;
  string user_id = 2;
}

service InvoiceApi {
  option (kalix.codegen) = {
    action: {}
  };

  option (kalix.service).acl = {
    allow: {principal: INTERNET}
  };

  rpc GetOne(GetDetailsRequest) returns (InvoiceDetails) {
    option (google.api.http).get = "/invoices/{invoice_id}";
  }

  rpc GetHistory(GetHistoryRequest) returns (invoice.view.v1.InvoiceHistory) {
    option (google.api.http).get = "/invoices/{invoice_id}/history";
  }

  rpc GetList(GetListRequest) returns (PaginatedShortInvoice) {
    option (google.api.http).get = "/invoices";
  }

  rpc Create(CreateRequest) returns (InvoiceDetails) {
    option (google.api.http) = {
      post: "/invoices"
      body: "*"
    };
  }

  rpc AddLineItem(AddLineItemRequest) returns (google.protobuf.Empty) {
    option (google.api.http) = {
      put: "/invoices/{invoice_id}/line_items"
      body: "*"
    };
  }

  rpc RemoveLineItem(RemoveLineItemRequest) returns (google.protobuf.Empty) {
    option (google.api.http) = {
      put: "/invoices/{invoice_id}/line_items/{product_id}/remove"
      body: "*"
    };
  }

  // other methods...

  rpc Pay(PayRequest) returns (google.protobuf.Empty) {
    option (google.api.http).post = "/invoices/{invoice_id}/pay";
  }

  rpc Delete(DeleteRequest) returns (google.protobuf.Empty) {
    option (google.api.http) = {delete: "/invoices/{invoice_id}"};
  }
}

Actions are stateless services which allow communication with other parts of the system. Because they are stateless, they do not require message field annotations. To notify Kalix to generate a stateless action, service proper kalix.codegen must be added to the service.

We have also added an ACL policy to allow access from the internet. It would not be possible without it. The endpointsexposure is done through Kalix runtime automatically and no additional configuration is required. One thing worth noting is that any service method of action, view, entity value, event source entity can be exposed to the public. Without it, REST API must be defined at the method level with google.api.http annotation. We define one of the HTTP methods (GET, POST, PUT…) with the URL path. In the path we can specify a protobuf message field reference which will then be filled.

Forwarding to the entity service

Let’s explain how to take care of simple forwards to other services? We can execute the effects using the Action.Effects API or for the simplest cases where the result type of the View methods is the same as another component, then we can use the forward function from Action.Effects API.

It is the first time that one component of the application is calling another part of the application. In Kalix, such calls are done through gRPC protocol. Each Action service has a components field defined, through which gRPC clients can be accessed.

  override def delete(currentState: InvoiceState, deleteCommand: DeleteCommand): EventSourcedEntity.Effect[Empty] =
    forNonEmptyState(currentState) {
      effects
        .emitEvent(
          Deleted(
            userId        = deleteCommand.userId,
            invoiceNumber = currentState.invoiceNumber
          )
        )
        .deleteEntity()
        .thenReply(_ => Empty())

    }

View definitions

Ok, we have forwarded a command that returns an empty response, but how can we allow users to see a list of invoices in the system? Through the entity service we can access only one entity at a time. Here comes the View. Views are designed for the optimization of reading the data, not only for a particular id, but also for other properties. We can think of it as part of the query segregation part of CQRS. For our case, we will use them to return a list of all of the invoices in the system and to store the audit log of the invoice.

When designing a view, it is very similar to working with Event Sourced Entity or Value Entity. Kalix provides some fancy generator which requires almost no implementation if we use the same state of Value Entity when creating a View. If we decide to store the View with a different structure, then we need to provide a transformation for the new state. For Event Sourced Entities user involvement is always required, because the code generator is not smart enough to know how to proceed with particular events.

For our case we also have extracted the messages defined for the Views to a different file. Because – as we saw in the previous part of the article – that we used it in Invoice Entity and in the Public API controller.

syntax = "proto3";

package io.scalac.invoice.view.v1;

// query optimized invoice
message ShortInvoice {
  string invoice_id = 1;
  string user_id = 2;
  optional string invoice_number = 3;
  optional string date = 4;
  optional string customer_name = 5;
  // it can not be domain.InvoiceStatus as it is not
  // supported by the Kalix query
  string status = 6;
}

Querying with Views is very intuitive thanks to the SQL syntax which is provided for the annotated methods. They are generated by Kalix and, besides SQL query, no more things need to be done. It supports complex data structures with nesting. Very nice pagination utilities. Result streaming. The indexes are also created based on the query filters which are used.

However, they have a few limitations. Statements are available only at compile time, which makes their usage very limited. Text search must be done using a special text_search function which does not allow the using of patterns. Sorting can not be used with pagination. Joins are not something that works out of the box. This requires some Kalix team support.

syntax = "proto3";

package io.scalac.invoice.views.v1;

import "google/protobuf/any.proto";
import "io/scalac/invoice/domain/events.proto";
import "io/scalac/invoice/views/v1/model.proto";
import "kalix/annotations.proto";

message GetAll {
  string page_token = 1;
}

message GetAllWithStatus {
  string status = 1;
  string page_token = 2;
}

service InvoicesView {
  option (kalix.codegen) = {
    view: {}
  };

  rpc GetInvoices(GetAll) returns (PaginatedShortInvoice) {
    option (kalix.method).view.query = {query:
        "SELECT * AS invoices, next_page_token() AS next_page_token"
        " FROM invoices"
        " OFFSET page_token_offset(:page_token)"
        " LIMIT 100"
    };
  }

  rpc GetInvoicesInStatus(GetAllWithStatus) returns (PaginatedShortInvoice) {
    option (kalix.method).view.query = {query:
        "SELECT * AS invoices, next_page_token() AS next_page_token"
        " FROM invoices"
        " WHERE status = :status"
        " OFFSET page_token_offset(:page_token)"
        " LIMIT 100"
    };
  }

  // updates...
}

The protobuf update view state methods definition is similar to the actions definition which we saw before for the workflows. They need to know what the source of the events will be, with (kalix.method).eventing.in = {event_sourced_entity: "invoice"} annotation and which table view it will create and update (kalix.method).view.update = {table: "invoices"}.

service InvoicesView {
  option (kalix.codegen) = {
    view: {}
  };

  // query methods ...
 
  rpc ProcessDateChanged(domain.DateChanged) returns (ShortInvoice) {
    option (kalix.method).eventing.in = {event_sourced_entity: "invoice"};
    option (kalix.method).view.update = {table: "invoices"};
  }
  
  // more update methods ...

  rpc ProcessDeleted(domain.Deleted) returns (ShortInvoice) {
    option (kalix.method).eventing.in = {event_sourced_entity: "invoice"};
    option (kalix.method).view.update = {table: "invoices"};
  }

  rpc IgnoreOtherEvents(google.protobuf.Any) returns (ShortInvoice) {
    option (kalix.method).eventing.in = {
      event_sourced_entity: "invoice"
      ignore: true
    };
  }
}

Updating the view is very similar to the entity state updates. An effect of event processing could be error, state update, view deletion or ignoring an event without view the state change.

class InvoicesViewImpl(context: ViewContext) extends AbstractInvoicesView {

  override def emptyState: ShortInvoice = ShortInvoice.defaultInstance

  override def processDateChanged(state: ShortInvoice, dateChanged: DateChanged): UpdateEffect[ShortInvoice] =
    effects.updateState(
      state.copy(
        date = Option.unless(dateChanged.date.isBlank())(dateChanged.date)
      )
    )

  // more update methods ...

  override def processPaid(state: ShortInvoice, paid: Paid): UpdateEffect[ShortInvoice] =
    effects.updateState(
      state.copy(
        status = InvoiceStatus.INVOICE_STATUS_PAID.name
      )
    )

  override def processDeleted(state: ShortInvoice, deleted: Deleted): UpdateEffect[ShortInvoice] =
    effects.deleteState()
}

For the invoice history

Kalix lacks however an ability to return all of the events that were used to produce a particular event sourced entity. That is why another View creation is required, which will store events as they are.

syntax = "proto3";

package io.scalac.invoice.view.v1;

import "google/protobuf/timestamp.proto";
import "io/scalac/invoice/domain/events.proto";

// history view
message InvoiceHistory {
  string invoice_id = 1;
  repeated Change changes = 2;
}

message Change {
  google.protobuf.Timestamp timestamp = 1;
  string user_id = 2;

  oneof change {
    domain.Created created = 3;
    domain.LineItemAdded line_item_added = 4;
    domain.CustomerDetailsChanged customer_details_changed = 5;
    domain.DateChanged date_changed = 6;
    domain.LineItemRemoved line_item_removed = 7;
    domain.InvoiceNumberChanged invoice_number_changed = 8;
    domain.Paid paid = 9;
  }
}

The service looks very similar to the one we defined before. Because we query by the invoice_id we are returning a single element from the service. The important thing to note is that here, all of the events defined in the entity should be handled.

syntax = "proto3";

package io.scalac.invoice.view.v1;

import "io/scalac/invoice/domain/events.proto";
import "io/scalac/invoice/view/v1/invoice_history.proto";
import "kalix/annotations.proto";

message ByInvoiceIdRequest {
  string invoice_id = 1;
}

service InvoiceHistoryView {
  option (kalix.codegen) = {
    view: {}
  };

  rpc GetByInvoiceId(ByInvoiceIdRequest) returns (InvoiceHistory) {
    option (kalix.method).view.query = {query:
        "SELECT * FROM invoice_history"
        " WHERE invoice_id = :invoice_id"
    };
  }

 // updates...
}

Taking care of invoice number uniqueness

If we were backed up by the database server, we would simply need to add some constraints or composed keys to preserve uniqueness across the entities. But in Kalix this is a bit tricky, as in protobuf based SDK, there is no possibility yet to define composed keys. So let’s stop here for a moment.

If we look for the available concepts that we could use in the Kalix application, we will note that we could use Views, to gather information about already used invoice numbers. That is wrong, we can not use it, as Views in Kalix provide an eventual consistency model across the data. It is perfect for retrieving information but can not be used as there is a high probability that the invoice number will be duplicated by a concurrent call.

Ok so let’s, look further for something that has strong consistent guarantees. So we have Workflow, Value Entity or Event Sourced Entity. We do not need to store the audit, so Event Sourced Entity does not seem to be the right choice. Let’s go with Value Entity.

Invoice Number Entity

So the concept is simple. Since we can not back it up to our invoice directly, we must place it next to it. How can we do that? Let’s define a simple state, which will keep a unique identifier and will point to any concurrent calls that this invoice number, identifier or handle has already occupied and therefore can not be used. Let’s create a state at: ./src/main/protobuf/io/scalac/invoice/entity/state/invoice_number_state.proto

syntax = "proto3";

package io.scalac.invoice.entity.state;

enum InvoiceNumberStatus {
  INVOICE_NUMBER_STATUS_UNSPECIFIED = 0;
  INVOICE_NUMBER_STATUS_RESERVED = 1;
  INVOICE_NUMBER_STATUS_CONFIRMED = 2;
}

message InvoiceNumberState {
  InvoiceNumberStatus status = 1;
}

Then let’s define the invoice number entity service itself. It needs to handle the uniqueness of the identifier creation for some services. It also needs to confirm that it is indeed being used somewhere. Given that a message sent between services might not be delivered immediately, the invoice number entity needs to wait for some time and then the handle needs to be freed. We need to also take care to release the identifier if it will no longer be used.

The definition is almost the same as for the Event Sourced Entity. The visible change is that we tell kalix.codegen to generate value_entity and we no longer need to define events that will be used to create the state. State will be updated directly with the service methods. Again, we need to define a message and annotate one field with the id annotation.

syntax = "proto3";

package io.scalac.invoice.entity;

import "google/protobuf/empty.proto";
import "kalix/annotations.proto";

message InvoiceNumber {
  string value = 1 [(kalix.field).id = true];
}

service InvoiceNumberService {
  option (kalix.codegen) = {
    value_entity: {
      name: "io.scalac.invoice.entity.InvoiceNumberEntityService"
      type_id: "invoice_number"
      state: "io.scalac.invoice.entity.state.InvoiceNumberState"
    }
  };

  rpc Create(InvoiceNumber) returns (google.protobuf.Empty) {}
  rpc Confirm(InvoiceNumber) returns (google.protobuf.Empty) {}
  rpc Expire(InvoiceNumber) returns (google.protobuf.Empty) {}
  rpc Free(InvoiceNumber) returns (google.protobuf.Empty) {}
}

The implementation

For the creation we need to check if the invoice_number is not blank, as this may generate us more problems. Emptiness should be handled at the caller side. Later on, as we saw before, a check against an empty state needs to be done. If all of the checks pass successfully, the state gets updated with reserved status.

Confirmation is pretty straightforward, however expiration and freeing should be explained a bit. In theory we should be able to replace those two methods with the delete method. However, this would generate one big issue, the entity will not be available for around a week. That is why the state is being cleaned.

class InvoiceNumberEntityService(context: ValueEntityContext) extends AbstractInvoiceNumberEntityService {

  override def emptyState: InvoiceNumberState =
    InvoiceNumberState.defaultInstance

  override def create(
    currentState: InvoiceNumberState,
    invoiceNumber: InvoiceNumber
  ): ValueEntity.Effect[Empty] =
    if (invoiceNumber.value.isBlank())
      effects.error("Invoice number is empty!", Status.Code.INVALID_ARGUMENT)
    else if (currentState != emptyState)
      effects.error(s"Invoice number ${invoiceNumber.value} already exists!", Status.Code.ALREADY_EXISTS)
    else
      effects
        .updateState(
          currentState.copy(
            status = InvoiceNumberStatus.INVOICE_NUMBER_STATUS_RESERVED
          )
        )
        .thenReply(Empty())

  override def confirm(
    currentState: InvoiceNumberState,
    invoiceNumber: InvoiceNumber
  ): ValueEntity.Effect[Empty] =
    effects
      .updateState(
        currentState.copy(
          status = InvoiceNumberStatus.INVOICE_NUMBER_STATUS_CONFIRMED
        )
      )
      .thenReply(Empty())

  // we do not remove the entity yet because it would be inaccessible for ~week
  override def expire(
    currentState: InvoiceNumberState,
    invoiceNumber: InvoiceNumber
  ): ValueEntity.Effect[Empty] =
    currentState.status match {
      case INVOICE_NUMBER_STATUS_CONFIRMED => effects.reply(Empty())
      case _                               => effects.updateState(emptyState).thenReply(Empty())
    }

  // we do not remove the entity yet because it would be inaccessible for ~week
  override def free(
    currentState: InvoiceNumberState,
    invoiceNumber: InvoiceNumber
  ): ValueEntity.Effect[Empty] =
    effects
      .updateState(emptyState)
      .thenReply(Empty())

}

How to use the Invoice Number Entity?

To use it correctly we will need a few more components. We have no possibility to call the InvoiceNumberEntityService from within InvoiceEntityService, as Kalix does not allow entities to communicate with each other. This means we have to move it to our controller which we defined before.

Invoice number reservation

To reserve the invoice number, let’s simply call the create method from the InvoiceNumberEntityService. If the invoice number is free, then it will be created without failure. If it has already been taken, then the failure will be returned, in the end rejecting the whole command and forcing the user to use a different invoice number. Create method also sets an expiration timer, that will free the invoice number in the case that no service confirms the invoice number after timeout.

We can see here that scala Futures are taking part here. We need to be careful, as we do not want to accidentally call the changeInvoiceNumber method in parallel, so it will need to happen after we handle the reservation call. Otherwise we will have a duplicate invoice number in our system.

class InvoiceApiAction(creationContext: ActionCreationContext) 
  extends AbstractInvoiceApiAction {

  override def changeInvoiceNumber(
    changeInvoiceNumberRequest: ChangeInvoiceNumberRequest
  ): Action.Effect[Empty] = {
    val newInvoiceNumber = InvoiceNumber(
      value = changeInvoiceNumberRequest.invoiceNumber
    )
    effects.asyncReply {
      for {
        _ <- components.invoiceNumberEntityService.create(newInvoiceNumber).execute()
        resp <- components.invoiceEntityService
          .changeInvoiceNumber(
            ChangeInvoiceNumberCommand(
              invoiceId     = changeInvoiceNumberRequest.invoiceId,
              userId        = changeInvoiceNumberRequest.userId,
              invoiceNumber = newInvoiceNumber.value
            )
          )
          .execute()
      } yield resp
    }
  }



  // other methods ...
}

Action workflows to keep the state consistent

The unique invoice number has been assigned, but as we have already said, we need to confirm that it has been used. To do this, we need to create another Action service, which will be listening for any invoice changes events, and will confirm or free the invoice number usage. Let’s place it at:  ./src/main/protobuf/io/scalac/invoice/action/invoice_workflow.proto The methods in the service are annotated with (kalix.method).eventing.in = { event_sourced_entity: "invoice" }, this will tell Kalix to listen to any particular events coming from our invoice entity. In this way, once the invoice number changes in the invoice, the invoice number entity will be called to confirm or free the usage of identifiers.

syntax = "proto3";

package io.scalac.invoice.actions;

import "google/protobuf/any.proto";
import "google/protobuf/empty.proto";
import "io/scalac/invoice/domain/events.proto";
import "kalix/annotations.proto";

service InvoiceWorkflow {
  option (kalix.codegen) = {
    action: {}
  };

  rpc OnInvoiceNumberChanged(domain.InvoiceNumberChanged) returns (google.protobuf.Empty) {
    option (kalix.method).eventing.in = {event_sourced_entity: "invoice"};
  }

  rpc OnDeleted(domain.Deleted) returns (google.protobuf.Empty) {
    option (kalix.method).eventing.in = {event_sourced_entity: "invoice"};
  }

  rpc IgnoreOtherEvents(google.protobuf.Any) returns (google.protobuf.Empty) {
    option (kalix.method).eventing.in = {
      event_sourced_entity: "invoice"
      ignore: true
    };
  }
}

One important thing to note is that Kalix gives an at-least-once call guarantee. This means that at some point, invoice numbers should be confirmed or freed.

class InvoiceWorkflowAction(creationContext: ActionCreationContext) extends AbstractInvoiceWorkflowAction {

  // NOTE: In the case of a future failure the event action will be repeated
  // meaning that invoiceNumber at some point will be confirmed and the oldInvoiceNumber will be available again
  override def onInvoiceNumberChanged(invoiceNumberChanged: InvoiceNumberChanged): Action.Effect[Empty] =
    invoiceNumberChanged match {
      case InvoiceNumberChanged(_, invoiceNumber, oldInvoiceNumber, _) =>
        effects.asyncReply(
          for {
            _ <- Future.sequence(maybeConfirmFuture(invoiceNumber).toSeq)
            _ <- Future.sequence(maybeFreeInvoiceNumberFuture(oldInvoiceNumber).toSeq)
          } yield Empty()
        )
      case _ => // in case of event base empty
        effects.ignore
    }

  // NOTE: In the case of a future failure the event action will be repeated
  // meaning that invoiceNumber at some point will be available again
  override def onDeleted(deleted: Deleted): Action.Effect[Empty] =
    deleted match {
      case Deleted(_, invoiceNumber, unknownFields) if invoiceNumber.nonEmpty =>
        effects.forward(components.invoiceNumberEntityService.free(InvoiceNumber(invoiceNumber)))
      case _ =>
        effects.ignore
    }

  // other methods...
}

To tackle the expiration of an invoice number, we should also listen to the Invoice Number Entity changes. Listening is important for two reasons, to start the expiration timer and to stop the timer. We start the timer when the invoice number status is reserved and stop the timer when the status changes to confirmed.

class InvoiceNumberWorkflowAction(creationContext: ActionCreationContext) extends AbstractInvoiceNumberWorkflowAction {

  override def onInvoiceNumberChange(invoiceNumberState: InvoiceNumberState): Action.Effect[Empty] = {
    val invoiceNumber = contextForComponents.metadata.get("ce-subject").get
    val timerFuture = invoiceNumberState.status match {
      case INVOICE_NUMBER_STATUS_CONFIRMED => 
        timers.cancel(s"timer-$invoiceNumber")
      case INVOICE_NUMBER_STATUS_RESERVED =>
        timers.startSingleTimer(
          name         = s"timer-$invoiceNumber",
          delay        = 30.minutes,
          deferredCall = 
            components.invoiceNumberEntityService.expire(InvoiceNumber(invoiceNumber))
        )
      case _ => Future.successful(Done) // expired or freed
    }
    effects.asyncReply(timerFuture.map(_ => Empty()))
  }
}

Running

That is it, we have reached the end. If you would like to test this example out, it is available HERE. In the article, we have skipped many simple method implementations, so you will be able to understand it better.

In order to run service locally call sbt runAll from the root project location in the terminal, API endpoint should be available under port 9000. So use any gRPC and HTTP clients to test out the system behavior.

What is missing in the example.

We have not yet covered the Kalix service deployment and the security part. An automatic Github pipeline for the project is also something we might need for the project.

Thoughts on Kalix usage

Kalix provides a very nice, simple way to build distributed, operations free, reactive systems. Throughout the whole example we did not even think about setting up any database or message broker, we just concentrated on the design with protobuf and on simple Scala implementation. Once we deploy the application, it will be backed with monitoring, something which sometimes takes a lot of effort to integrate into infrastructure and application.

The infrastructure is built for us for free, but the runtime can only work with Kalix applications. We can not deploy different applications as they will not know how to work with Kalix. They need to be deployed somewhere else and configured with the Kalix operations team to provide the best performance.

We can not query the data as we would like, due to View limitations and the fact that gRCP API is the only way to retrieve the data. This might be problematic in some scenarios.

Kalix possibilities

Kalix forces a good approach to system design where we need to think directly about consistency and availability. We have to bear in mind that an at-least-once processing message will eventually provide the correct application state. The Effect API is also different for every component type, something which ensures the correct approach. On the other hand, as we have seen, the provided tools are not ideal and we can have problems with basic concepts like uniqueness. That gives us another level of complication where developers may need to think and behave carefully.

The case classes generated out of the protobuf on one hand are perfect, because the serialization is out of the box. With the proper tools, we can get to know that our changes are not backward compatible. No custom encoders and decoders are needed. Unfortunately, they remove type safety from the Scala code, where everything needs to be a simple type which could be easily misassigned between objects. Not only that, but the classes apply method have all of the parameters defaulted to empty strings, zeroes and empty values. If that is the case, then we may miss the fact that new fields were introduced and the compiler will not inform us about it.

As we have seen, Kalix gives us a nice way to design systems as services. This gives us a fast way to build applications from zero to production ready with monitoring and with no operation costs. Kalix services are reactive, robust and fault tolerant. There are still some gaps to be covered and things to simplify, but overall the experience is very good.

Download e-book:

Scalac Case Study Book

Download now

Authors

Bartosz Budnik
Bartosz Budnik

Latest Blogposts

29.04.2024 / By  Matylda Kamińska

Scalendar May 2024

scalendar may 2024

Event-driven Newsletter Welcome to our May 2024 edition of Scalendar! As we move into a bustling spring, this issue brings you a compilation of the most anticipated frontend and software architecture events around the globe. With a particular focus on Scala conferences in May 2024, our newsletter serves as your guide to keep you updated […]

23.04.2024 / By  Bartosz Budnik

Kalix tutorial: Building invoice application

Kalix app building.

Scala is well-known for its great functional scala libraries which enable the building of complex applications designed for streaming data or providing reliable solutions with effect systems. However, there are not that many solutions which we could call frameworks to provide every necessary tool and out-of-the box integrations with databases, message brokers, etc. In 2022, Kalix was […]

17.04.2024 / By  Michał Szajkowski

Mocking Libraries can be your doom

Test Automations

Test automation is great. Nowadays, it’s become a crucial part of basically any software development process. And at the unit test level it is often a necessity to mimic a foreign service or other dependencies you want to isolate from. So in such a case, using a mock library should be an obvious choice that […]

software product development

Need a successful project?

Estimate project