Kafka
Cloud Agnostic
Open Source
Distributes notifications across gateway instances via Apache Kafka topics with partition-based scaling.
Source code: Notiway.Plugins.Portable.Brokers.Kafka
Compatibility
| Pairs with | Kafka Buffer |
| Product | Notiway Notify |
Installation
Environment Variables
| Variable | Required | Default | Description |
|---|---|---|---|
Infra__Plugins__Broker__Name | Yes | — | Set to Kafka |
Infra__Plugins__Broker__Version | Yes | — | Plugin version (e.g., 0.12.0) |
Infra__Plugins__Broker__Config__BootstrapServers | Yes | — | Kafka bootstrap servers (e.g., localhost:9092) |
Infra__Plugins__Broker__Config__Topic | No | notiway | Kafka topic name used for broadcasting notifications |
Infrastructure
- Running Kafka cluster — v2.8+ recommended (KRaft mode supported).
- Create the topic before starting Notiway, or enable auto-topic creation on the cluster.
Docker Compose
docker-compose.yml
services:
notiway:
image: notiway/notify:portable-0.6.0
ports:
- "5000:8080"
environment:
- Infra__Plugins__Broker__Name=Kafka
- Infra__Plugins__Broker__Version=0.12.0
- Infra__Plugins__Broker__Config__BootstrapServers=kafka:9092
- Infra__Plugins__Broker__Config__Topic=notiway
kafka:
image: confluentinc/cp-kafka:7.5.0
ports:
- "9092:9092"
environment:
- KAFKA_NODE_ID=1
- KAFKA_PROCESS_ROLES=broker,controller
- KAFKA_LISTENERS=PLAINTEXT://0.0.0.0:9092,CONTROLLER://0.0.0.0:9093
- KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://kafka:9092
- KAFKA_CONTROLLER_QUORUM_VOTERS=1@kafka:9093
- KAFKA_CONTROLLER_LISTENER_NAMES=CONTROLLER
- KAFKA_LISTENER_SECURITY_PROTOCOL_MAP=CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT
- CLUSTER_ID=MkU3OEVBNTcwNTJENDM2QkUsage
Publish notifications to the Kafka topic from your backend. The message body must be the notification JSON.
var config = new ProducerConfig { BootstrapServers = "localhost:9092" };
using var producer = new ProducerBuilder<Null, string>(config).Build();
await producer.ProduceAsync("notiway", new Message<Null, string>
{
Value = JsonSerializer.Serialize(notification)
});const { Kafka } = require("kafkajs");
const kafka = new Kafka({ brokers: ["localhost:9092"] });
const producer = kafka.producer();
await producer.connect();
await producer.send({
topic: "notiway",
messages: [{ value: JSON.stringify(notification) }]
});from confluent_kafka import Producer
import json
producer = Producer({"bootstrap.servers": "localhost:9092"})
producer.produce("notiway", json.dumps(notification).encode("utf-8"))
producer.flush()Properties props = new Properties();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
try (KafkaProducer<String, String> producer = new KafkaProducer<>(props)) {
producer.send(new ProducerRecord<>("notiway", objectMapper.writeValueAsString(notification)));
}#include <librdkafka/rdkafkacpp.h>
RdKafka::Conf *conf = RdKafka::Conf::create(RdKafka::Conf::CONF_GLOBAL);
conf->set("bootstrap.servers", "localhost:9092", errstr);
RdKafka::Producer *producer = RdKafka::Producer::create(conf, errstr);
std::string message = /* serialize notification as JSON */;
producer->produce("notiway", RdKafka::Topic::PARTITION_UA, RdKafka::Producer::RK_MSG_COPY,
const_cast<char *>(message.c_str()), message.size(), nullptr, 0, 0, nullptr);
producer->flush(5000);