Skip to main content

Unable to send messages to Kafka broker from python.

Good day.

I have configured Kafka to run on docker swarm, when I try to use Python to
send messages, I am being presented with the error messages such as the
ones below in rapid succession, non-stop.
What changes should I make so that I am able to send messages to the broker?



ERROR:kafka.conn:Connect attempt to <BrokerConnection node_id=1
host=localhost:9092 <connecting> [IPv6 ('::1', 9092, 0, 0)]> returned error
111. Disconnecting.
INFO:kafka.conn:<BrokerConnection node_id=1 host=localhost:9092
<connecting> [IPv6 ('::1', 9092, 0, 0)]>: Closing connection.
KafkaConnectionError: 111 ECONNREFUSED
WARNING:kafka.client:Node 1 connection failed -- refreshing metadata
INFO:kafka.conn:<BrokerConnection node_id=1 host=localhost:9092
<connecting> [IPv4 ('127.0.0.1', 9092)]>: connecting to localhost:9092
[('127.0.0.1', 9092) IPv4]
ERROR:kafka.conn:Connect attempt to <BrokerConnection node_id=1
host=localhost:9092 <connecting> [IPv4 ('127.0.0.1', 9092)]> returned error
111. Disconnecting.
INFO:kafka.conn:<BrokerConnection node_id=1 host=localhost:9092
<connecting> [IPv4 ('127.0.0.1', 9092)]>: Closing connection.
KafkaConnectionError: 111 ECONNREFUSED
WARNING:kafka.client:Node 1 connection failed -- refreshing metadata
exit();INFO:kafka.conn:<BrokerConnection node_id=1 host=localhost:9092
<connecting> [IPv6 ('::1', 9092, 0, 0)]>: connecting to localhost:9092
[('::1', 9092, 0, 0) IPv6]
ERROR:kafka.conn:Connect attempt to <BrokerConnection node_id=1
host=localhost:9092 <connecting> [IPv6 ('::1', 9092, 0, 0)]> returned error
111. Disconnecting.
INFO:kafka.conn:<BrokerConnection node_id=1 host=localhost:9092
<connecting> [IPv6 ('::1', 9092, 0, 0)]>: Closing connection.
KafkaConnectionError: 111 ECONNREFUSED



Below is the python script I am using to connect.
from kafka import KafkaProducer;
import json;
import logging;
logging.basicConfig(level=logging.INFO);

producer = KafkaProducer(
bootstrap_servers=["127.0.0.1:9089"] # Replace with your Kafka broker
address
,request_timeout_ms=2000 #two seconds timeout
,retries=3
,value_serializer=lambda v: json.dumps(v).encode("utf-8")
);
data = {
"event_type": "user_login",
"user_id": 123,
"timestamp": "2025-07-29T15:30:00Z"
}
topic_name = "topic1:1:1";

producer.send(topic_name, data);
producer.flush();



Below are the contents of the docker compose file I use to construct a
docker stack which is subsequently deployed on docker swarm.

---
version: "3.9"

services:

zookeeper:
image: confluentinc/cp-zookeeper:latest
hostname: zookeeper
ports:
- "2181:2181"
environment:
ZOOKEEPER_CLIENT_PORT: 2181
ZOOKEEPER_TICK_TIME: 2000

kafka:
image: apache/kafka:4.0.0
environment:
# Configure listeners for both docker and host communication
-
"KAFKA_LISTENERS=CONTROLLER://localhost:9091,HOST://
0.0.0.0:9092,DOCKER://0.0.0.0:9093"
-

"KAFKA_ADVERTISED_LISTENERS=HOST://localhost:9092,DOCKER://kafka:9093"
-

"KAFKA_LISTENER_SECURITY_PROTOCOL_MAP=CONTROLLER:PLAINTEXT,DOCKER:PLAINTEXT,HOST:PLAINTEXT"
-
"KAFKA_NODE_ID=1"
-
"KAFKA_BROKER_ID=1"
-
"KAFKA_PROCESS_ROLES=broker,controller"
-
"KAFKA_CONTROLLER_LISTENER_NAMES=CONTROLLER"
-
"KAFKA_CONTROLLER_QUORUM_VOTERS=1@localhost:9091"
-
"KAFKA_INTER_BROKER_LISTENER_NAME=DOCKER"
-
"KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR=1"
-
"KAFKA_LOG_DIR=/local/data/kafka/data"
-
"KAFKA_PORT=30000"
-
"KAFKA_CREATE_TOPICS=topic1:1:1"
-
"KAFKA_ZOOKEPER_CONNECT=zookeeper:2181"
ports:
-
"0.0.0.0:9089:9092/tcp"
networks:
-
"nf_genome_annot-network-kafka"
volumes:
-
type: bind
source:
/local/data/kafka_data/nf_genome_annot/nf_genome_annot_kafka-4-0-0/nf_genome_annot_kafka-4-0-0_kafka_9092.data
target: /local/data/kafka/data
read_only: false
deploy:
labels:
service.description: "This service runs the
nf_genome_annot_kafka-4-0-0 application for 'kafka' task"
mode: replicated
replicas: 1
placement:
max_replicas_per_node: 1
update_config:
parallelism: 1
delay: 10s
resources:
limits:
cpus: "2"
memory: "2G"
reservations:
cpus: "1"
memory: "1G"
restart_policy:
condition: on-failure
delay: 30s
max_attempts: 1
window: 30s

kafka-ui:
image: kafbat/kafka-ui:main
ports:
-
"0.0.0.0:8079:8080/tcp"
environment:
-
"DYNAMIC_CONFIG_ENABLED=true"
-
"KAFKA_CLUSTERS_0_NAME=local"
-
"KAFKA_CLUSTERS_0_BOOTSTRAPSERVERS=kafka:9093"


networks:
nf_genome_annot-network-kafka:
external: true

Comments