Link Search Menu Expand Document

Tagging events

To tag events you’ll need to create an Event Adapter that will wrap the event in a org.apache.pekko.persistence.journal.Tagged class with the given tags. The Tagged class will instruct pekko-persistence-postgres to tag the event with the given set of tags.

The persistence plugin will not store the Tagged class in the journal. It will strip the tags and payload from the Tagged class, and use the class only as an instruction to tag the event with the given tags and store the payload in the message field of the journal table.

package com.swissborg.example

import org.apache.pekko.persistence.journal.{ Tagged, WriteEventAdapter }
import com.swissborg.example.Person.{ LastNameChanged, FirstNameChanged, PersonCreated }

class TaggingEventAdapter extends WriteEventAdapter {
  override def manifest(event: Any): String = ""

  def withTag(event: Any, tag: String) = Tagged(event, Set(tag))

  override def toJournal(event: Any): Any = event match {
    case _: PersonCreated =>
      withTag(event, "person-created")
    case _: FirstNameChanged =>
      withTag(event, "first-name-changed")
    case _: LastNameChanged =>
      withTag(event, "last-name-changed")
    case _ => event
  }
}

The EventAdapter must be registered by adding the following to the root of application.conf Please see the demo-akka-persistence-postgres project for more information.

:warning: Please note that this library is based on Pekko, but demo uses an older version of akka-persistence-postgres and there might be inconsistencies between the documentation and the provided code

postgres-journal {
  event-adapters {
    tagging = "com.swissborg.example.TaggingEventAdapter"
  }
  event-adapter-bindings {
    "com.swissborg.example.Person$PersonCreated" = tagging
    "com.swissborg.example.Person$FirstNameChanged" = tagging
    "com.swissborg.example.Person$LastNameChanged" = tagging
  }
}

You can retrieve a subset of all events by specifying offset, or use 0L to retrieve all events with a given tag. The offset corresponds to an ordered sequence number for the specific tag. Note that the corresponding offset of each event is provided in the EventEnvelope, which makes it possible to resume the stream at a later point from a given offset.

In addition to the offset the EventEnvelope also provides persistenceId and sequenceNr for each event. The sequenceNr is the sequence number for the persistent actor with the persistenceId that persisted the event. The persistenceId + sequenceNr is an unique identifier for the event.

The returned event stream contains only events that correspond to the given tag, and is ordered by the creation time of the events. The same stream elements (in same order) are returned for multiple executions of the same query. Deleted events are not deleted from the tagged event stream.