0.12.0

Compatibility

Pairs withKafka Buffer
ProductNotiway Notify

Installation

Environment Variables

VariableRequiredDefaultDescription
Infra__Plugins__Broker__NameYesSet to Kafka
Infra__Plugins__Broker__VersionYesPlugin version (e.g., 0.12.0)
Infra__Plugins__Broker__Config__BootstrapServersYesKafka bootstrap servers (e.g., localhost:9092)
Infra__Plugins__Broker__Config__TopicNonotiwayKafka topic name used for broadcasting notifications

Infrastructure

  1. Running Kafka cluster — v2.8+ recommended (KRaft mode supported).
  2. Create the topic before starting Notiway, or enable auto-topic creation on the cluster.

Docker Compose

docker-compose.yml
services:
  notiway:
    image: notiway/notify:portable-0.5.0
    ports:
      - "5000:8080"
    environment:
      - Infra__Plugins__Broker__Name=Kafka
      - Infra__Plugins__Broker__Version=0.12.0
      - Infra__Plugins__Broker__Config__BootstrapServers=kafka:9092
      - Infra__Plugins__Broker__Config__Topic=notiway

  kafka:
    image: confluentinc/cp-kafka:7.5.0
    ports:
      - "9092:9092"
    environment:
      - KAFKA_NODE_ID=1
      - KAFKA_PROCESS_ROLES=broker,controller
      - KAFKA_LISTENERS=PLAINTEXT://0.0.0.0:9092,CONTROLLER://0.0.0.0:9093
      - KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://kafka:9092
      - KAFKA_CONTROLLER_QUORUM_VOTERS=1@kafka:9093
      - KAFKA_CONTROLLER_LISTENER_NAMES=CONTROLLER
      - KAFKA_LISTENER_SECURITY_PROTOCOL_MAP=CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT
      - CLUSTER_ID=MkU3OEVBNTcwNTJENDM2Qk

Usage

Publish notifications to the Kafka topic from your backend. The message body must be the notification JSON.

var config = new ProducerConfig { BootstrapServers = "localhost:9092" };
using var producer = new ProducerBuilder<Null, string>(config).Build();

await producer.ProduceAsync("notiway", new Message<Null, string>
{
    Value = JsonSerializer.Serialize(notification)
});
const { Kafka } = require("kafkajs");

const kafka = new Kafka({ brokers: ["localhost:9092"] });
const producer = kafka.producer();
await producer.connect();
await producer.send({
    topic: "notiway",
    messages: [{ value: JSON.stringify(notification) }]
});
from confluent_kafka import Producer
import json

producer = Producer({"bootstrap.servers": "localhost:9092"})
producer.produce("notiway", json.dumps(notification).encode("utf-8"))
producer.flush()
Properties props = new Properties();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());

try (KafkaProducer<String, String> producer = new KafkaProducer<>(props)) {
    producer.send(new ProducerRecord<>("notiway", objectMapper.writeValueAsString(notification)));
}
#include <librdkafka/rdkafkacpp.h>

RdKafka::Conf *conf = RdKafka::Conf::create(RdKafka::Conf::CONF_GLOBAL);
conf->set("bootstrap.servers", "localhost:9092", errstr);
RdKafka::Producer *producer = RdKafka::Producer::create(conf, errstr);

std::string message = /* serialize notification as JSON */;
producer->produce("notiway", RdKafka::Topic::PARTITION_UA, RdKafka::Producer::RK_MSG_COPY,
    const_cast<char *>(message.c_str()), message.size(), nullptr, 0, 0, nullptr);
producer->flush(5000);