Apache Kafka is a fast, real-time, distributed, fault-tolerant message broker.
Using Kafka, you can transfer streaming data to the cluster, which is generated continuously, for example, a history of website visits, financial transactions, online shopping orders, application logs, etc. This information can help to understand what is happening with the data right now, create recommendations, use machine learning or aggregate data for further analysis. All this takes seconds or minutes, instead of hours and days.
You can read more in the official documentation.
1. Download the code
First, download and untar the code
wget http://mirror.linux-ia64.org/apache/kafka/2.4.0/kafka_2.12-2.4.0.tgz tar -xzf kafka_2.12-2.4.0.tgz cd kafka_2.12-2.4.0
2. Start the server
Kafka uses ZooKeeper, we will use the script packaged with Kafka to get a single-node ZooKeeper instance.
bin/zookeeper-server-start.sh config/zookeeper.properties
Start the Kafka server.
bin/kafka-server-start.sh config/server.properties
Now we can create a topic named "test" with a single partition and one replica.
bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test
3. Install PHP client
As this client is no longer supported by the authors, I recommend to use some other client.
There are several PHP clients for Kafka. I recommend using this one https://github.com/weiboad/kafka-php because I already used it in production and it showed better performance than some other PHP clients.
composer require nmred/kafka-php
4. Create consumer
Create file Consumer.php and set broker list to 127.0.0.1:9092 as we have Kafka cluster locally with one broker (node).
<?php require 'vendor/autoload.php'; $config = \Kafka\ConsumerConfig::getInstance(); $config->setMetadataRefreshIntervalMs(10000); $config->setMetadataBrokerList('127.0.0.1:9092'); $config->setGroupId('test'); $config->setBrokerVersion('1.0.0'); $config->setTopics(['test']); $consumer = new \Kafka\Consumer(); $consumer->start(function($topic, $part, $message) { var_dump($message); });
5. Create producer
A producer can work in two modes - asynchronous and synchronous. Let's try both.
First, create file Producer.php with the following content for async mode.
<?php require 'vendor/autoload.php'; $config = \Kafka\ProducerConfig::getInstance(); $config->setMetadataRefreshIntervalMs(10000); $config->setMetadataBrokerList('127.0.0.1:9092'); $config->setBrokerVersion('1.0.0'); $config->setRequiredAck(1); $config->setIsAsyn(false); $config->setProduceInterval(500); $producer = new \Kafka\Producer( function() { return [ [ 'topic' => 'test', 'value' => 'test....message.', 'key' => 'testkey', ], ]; } ); $producer->success(function($result) { var_dump($result); }); $producer->error(function($errorCode) { var_dump($errorCode); }); $producer->send(true);
And create file ProducerSync.php for sync mode where we send 100 messages to Kafka in a cycle.
<?php require 'vendor/autoload.php'; $config = \Kafka\ProducerConfig::getInstance(); $config->setMetadataRefreshIntervalMs(10000); $config->setMetadataBrokerList('127.0.0.1:9092'); $config->setBrokerVersion('1.0.0'); $config->setRequiredAck(1); $config->setIsAsyn(false); $config->setProduceInterval(500); $producer = new \Kafka\Producer(); for($i = 0; $i < 100; $i++) { $producer->send([ [ 'topic' => 'test', 'value' => 'test... message #'.$i, 'key' => '', ], ]); }
6. Let's send messages
Ok, we have all that we need. Start our consumer, run in a console.
php Consumer.php
Note that it can take some time before the consumer begins to receive our messages.
Now try to send messages by running in a console in a new tab
php Producer.php
And
php ProducerSync.php
And as we can see all messages come to our consumer, exactly what we want!
What's next?
We have a single-node instance with a single partition and one replica. Not many changes if we want to start a few more broker instances, just see an example in the official documentation.