<?php
namespace Kafka\Consumer;
use Kafka\ConsumerConfig;
class Process
{
use \Psr\Log\LoggerAwareTrait;
use \Kafka\LoggerTrait;
protected $consumer = null;
protected $messages = [];
public function __construct(callable $consumer = null)
{
$this->consumer = $consumer;
}
/**
* start consumer
*
* @access public
* @return void
*/
public function init()
{
// init protocol
$config = \Kafka\ConsumerConfig::getInstance();
\Kafka\Protocol::init($config->getBrokerVersion(), $this->logger);
// init process request
$broker = \Kafka\Broker::getInstance();
$broker->setConfig($config);
$broker->setProcess(function ($data, $fd) {
$this->processRequest($data, $fd);
});
// init state
$this->state = \Kafka\Consumer\State::getInstance();
if ($this->logger) {
$this->state->setLogger($this->logger);
}
$this->state->setCallback([
\Kafka\Consumer\State::REQUEST_METADATA => function () {
return $this->syncMeta();
},
\Kafka\Consumer\State::REQUEST_GETGROUP => function () {
return $this->getGroupBrokerId();
},
\Kafka\Consumer\State::REQUEST_JOINGROUP => function () {
return $this->joinGroup();
},
\Kafka\Consumer\State::REQUEST_SYNCGROUP => function () {
return $this->syncGroup();
},
\Kafka\Consumer\State::REQUEST_HEARTGROUP => function () {
return $this->heartbeat();
},
\Kafka\Consumer\State::REQUEST_OFFSET => function () {
return $this->offset();
},
\Kafka\Consumer\State::REQUEST_FETCH_OFFSET => function () {
return $this->fetchOffset();
},
\Kafka\Consumer\State::REQUEST_FETCH => function () {
return $this->fetch();
},
\Kafka\Consumer\State::REQUEST_COMMIT_OFFSET => function () {
return $this->commit();
},
]);
$this->state->init();
}
/**
* start consumer
*
* @access public
* @return void
*/
public function start()
{
$this->init();
$this->state->start();
}
/**
* stop consumer
*
* @access public
* @return void
*/
public function stop()
{
// TODO: we should remove the consumer from the group here
$this->state->stop();
}
/**
* process Request
*
* @access public
* @return void
*/
protected function processRequest($data, $fd)
{
$correlationId = \Kafka\Protocol\Protocol::unpack(\Kafka\Protocol\Protocol::BIT_B32, substr($data, 0, 4));
switch ($correlationId) {
case \Kafka\Protocol::METADATA_REQUEST:
$result = \Kafka\Protocol::decode(\Kafka\Protocol::METADATA_REQUEST, substr($data, 4));
if (! isset($result['brokers']) || ! isset($result['topics'])) {
$this->error('Get metadata is fail, brokers or topics is null.');
$this->state->failRun(\Kafka\Consumer\State::REQUEST_METADATA);
} else {
$broker = \Kafka\Broker::getInstance();
$isChange = $broker->setData($result['topics'], $result['brokers']);
$this->state->succRun(\Kafka\Consumer\State::REQUEST_METADATA, $isChange);
}
break;
case \Kafka\Protocol::GROUP_COORDINATOR_REQUEST:
$result = \Kafka\Protocol::decode(\Kafka\Protocol::GROUP_COORDINATOR_REQUEST, substr($data, 4));
if (isset($result['errorCode']) && $result['errorCode'] == \Kafka\Protocol::NO_ERROR
&& isset($result['coordinatorId'])) {
\Kafka\Broker::getInstance()->setGroupBrokerId($result['coordinatorId']);
$this->state->succRun(\Kafka\Consumer\State::REQUEST_GETGROUP);
} else {
$this->state->failRun(\Kafka\Consumer\State::REQUEST_GETGROUP);
}
break;
case \Kafka\Protocol::JOIN_GROUP_REQUEST:
$result = \Kafka\Protocol::decode(\Kafka\Protocol::JOIN_GROUP_REQUEST, substr($data, 4));
if (isset($result['errorCode']) && $result['errorCode'] == 0) {
$this->succJoinGroup($result);
} else {
$this->failJoinGroup($result['errorCode']);
}
break;
case \Kafka\Protocol::SYNC_GROUP_REQUEST:
$result = \Kafka\Protocol::decode(\Kafka\Protocol::SYNC_GROUP_REQUEST, substr($data, 4));
if (isset($result['errorCode']) && $result['errorCode'] == 0) {
$this->succSyncGroup($result);
} else {
$this->failSyncGroup($result['errorCode']);
}
break;
case \Kafka\Protocol::HEART_BEAT_REQUEST:
$result = \Kafka\Protocol::decode(\Kafka\Protocol::HEART_BEAT_REQUEST, substr($data, 4));
if (isset($result['errorCode']) && $result['errorCode'] == 0) {
$this->state->succRun(\Kafka\Consumer\State::REQUEST_HEARTGROUP);
} else {
$this->failHeartbeat($result['errorCode']);
}
break;
case \Kafka\Protocol::OFFSET_REQUEST:
$result = \Kafka\Protocol::decode(\Kafka\Protocol::OFFSET_REQUEST, substr($data, 4));
$this->succOffset($result, $fd);
break;
case \Kafka\Protocol\Protocol::OFFSET_FETCH_REQUEST:
$result = \Kafka\Protocol::decode(\Kafka\Protocol::OFFSET_FETCH_REQUEST, substr($data, 4));
$this->succFetchOffset($result);
break;
case \Kafka\Protocol\Protocol::FETCH_REQUEST:
$result = \Kafka\Protocol::decode(\Kafka\Protocol::FETCH_REQUEST, substr($data, 4));
$this->succFetch($result, $fd);
break;
case \Kafka\Protocol\Protocol::OFFSET_COMMIT_REQUEST:
$result = \Kafka\Protocol::decode(\Kafka\Protocol::OFFSET_COMMIT_REQUEST, substr($data, 4));
$this->succCommit($result);
break;
default:
$this->error('Error request, correlationId:' . $correlationId);
}
}
protected function syncMeta()
{
$this->debug('Start sync metadata request');
$brokerList = \Kafka\ConsumerConfig::getInstance()->getMetadataBrokerList();
$brokerHost = [];
foreach (explode(',', $brokerList) as $key => $val) {
if (trim($val)) {
$brokerHost[] = $val;
}
}
if (count($brokerHost) == 0) {
throw new \Kafka\Exception('No valid broker configured');
}
shuffle($brokerHost);
$broker = \Kafka\Broker::getInstance();
foreach ($brokerHost as $host) {
$socket = $broker->getMetaConnect($host);
if ($socket) {
$params = \Kafka\ConsumerConfig::getInstance()->getTopics();
$this->debug('Start sync metadata request params:' . json_encode($params));
$requestData = \Kafka\Protocol::encode(\Kafka\Protocol::METADATA_REQUEST, $params);
$socket->write($requestData);
return;
}
}
throw new \Kafka\Exception(
sprintf(
'It was not possible to establish a connection for metadata with the brokers "%s"',
$brokerList
)
);
}
protected function getGroupBrokerId()
{
$broker = \Kafka\Broker