<?php defined('SYSPATH') or die('No direct script access.');
class Socket_Server_Core {
protected $process_store, $clients, $pm;
// Helpers
protected static $instance;
public static function factory($config = array())
{
return new Socket_Server($config);
}
public static function instance($config = array())
{
static $instance;
// Load the Socket Server instance
empty($instance) and $instance = new Socket_Server($config);
return $instance;
}
public function __construct($config = array())
{
$config += Kohana::config('socket_server');
$this->config = $config;
if ($this->config['enabled'] !== TRUE)
{
throw new Kohana_Exception('socket_server.disabled');
}
$this->clients = 0;
$this->pm = new Process_Manager();
self::$instance = $this;
}
public function run()
{
ob_end_flush();
ob_implicit_flush(TRUE);
Socket_Server::stdout(Kohana::lang('socket_server.info_startup', $this->config['port']));
$server_socket = @socket_create_listen($this->config['port']);
if ($server_socket === false)
{
Socket_Server::stdout(Kohana::lang('socket_server.error_listening', $this->config['port']));
exit(1);
} else {
$server_command_buffer = '';
socket_set_nonblock($server_socket);
$server_is_running = true;
do {
if ($this->pm->processes() < $this->config['max_clients'])
{
if (($client_socket = @socket_accept($server_socket)) === false) {
//Socket_Server::stdout(Kohana::lang('socket_server.error_listening', socket_strerror(socket_last_error($server_socket))));
usleep(100);
} elseif ($client_socket < 0) {
Socket_Server::stdout(Kohana::lang('socket_server.error_failed_connection', socket_strerror($client_socket)));
break;
} else {
// Connection established
$this->clients += 1;
if (($ipc_streams = stream_socket_pair(STREAM_PF_UNIX, STREAM_SOCK_STREAM, STREAM_IPPROTO_IP)) === false)
{
echo 'stream_socket_pair() failed.';
exit(1);
}
Socket_Server::stdout(Kohana::lang('socket_server.info_new_client', $this->clients));
$resource = new System_Resource_Model($client_socket, 'socket_close');
$pid = pcntl_fork();
if ($pid == -1)
{
// Failed to fork
Socket_Server::stdout('Failed to fork! Shutting down!');
$server_is_running = false; // trigger shutdown
} else {
$ppid = ($pid == 0) ? posix_getpid() : $pid;
$child_process = new Child_Process_Model($ppid, $ipc_streams[1]);
$child_process->add_resource($resource, $ppid.'socket');
$child_process->add_ipc_stream($ipc_streams[1]);
$this->pm->add_process($ppid, $child_process);
$this->pm->add_ipc_stream($ipc_streams[0], $ppid); // unnecessary
if ($pid == 0)
{
// Child process
$child_process->ipc_write('Client #'.$this->clients." connected to $ppid\n");
$this->child_process($child_process);
} else {
// Parent process
Socket_Server::stdout('Started child with pid: '.$pid);
}
}
}
} else {
Socket_Server::stdout('Max clients reached. Rejecting additional attempts.');
}
$read_streams = array(STDIN);
$client_input = $this->pm->ipc_read_select($this->config['read_size']);
if (count($client_input) > 0)
{
foreach($client_input as $pid => $ipc_input)
{
Socket_Server::stdout('Read a command from IPC ['.$pid.']: '.$ipc_input);
$this->ipc_received($ipc_input);
if (trim($ipc_input) == 'shutdown')
{
$server_is_running = false; // trigger break in while
break; // NOW!
}
}
}
if (false === ($num_changed_streams = stream_select($read_streams, $write_stream = NULL, $exception_stream = NULL, 0, 0)))
{
/* Error handling */
} elseif ($num_changed_streams > 0) {
/* At least on one of the streams something interesting happened */
foreach($read_streams as $key => $stream)
{
$command = trim(fread($read_streams[$key], $this->config['read_size']));
$args = array();
if ($read_streams[$key] == STDIN)
{
Socket_Server::stdout('Read a command from console: '.$command);
} elseif ($read_streams[$key] == $ipc_streams[0]) {
Socket_Server::stdout('Read a command from IPC: '.$command);
}
// handle commands from console and clients the same (not a real good idea...)
$this->ipc_received($command);
if ($command == 'shutdown')
{
$server_is_running = false; // trigger break in while
break; // NOW!
}
}
}
} while ($server_is_running);
Socket_Server::stdout(Kohana::lang('socket_server.info_shutdown'));
// handle external facing sockets
@socket_shutdown($server_socket);
@socket_close($server_socket);
// handle inter process communication streams
$this->pm->kill_all();
}
}
private function child_process(Child_Process_Model &$child)
{
$client_socket = $child->get_resource($child->pid.'socket');
$msg = "\n" . Kohana::lang('socket_server.client_welcome_msg') . "\n";
socket_write($client_socket->rs, $msg, strlen($msg));
$keep_client_open = true;
$cur_buf = '';
$write_stream = NULL;
$exception_stream = NULL;
do {
// first we check the stream (server IPC connection)
$server_message = $child->ipc_read_select();
if (NULL != $server_message)
{
$command_from_server = $this->simple_rpc($server_message);
if ($command_from_server[0] == 'ping')
{
$child->ipc_write("pong\n");
} elseif ($command_from_server[0] == 'quit') {
$keep_client_open = false;
break;
} elseif ($command_from_server[0] == 'broadcast') {
if (count($command_from_server[1]) == 1)
{
$args = substr($command_from_server[1][0], strpos($command_from_server[1][0], '"') + 1, strrpos($command_from_server[1][0], '"') - strpos($command_from_server[1][0], '"') - 1);
$msg = 'Broadcast: '.stripslashes($command_from_server[1][0])."\n";
socket_write($client_socket->rs, $msg, strlen($msg));
}
}
}
// then we check the client connection socket (timeout in 1 second from listening... we're good listeners)
$read_socket = array($client_socket->rs);
if (false === ($num_changed_sockets = socket_select($read_socket, $write_stream, $exception_stream, 1, 0)))
{
/* Error handling */
} elseif ($num_changed_sockets > 0) {
/* At least on one of the streams something interesting happened */
$buffer = @socket_read($client_socket->rs, $this->config['read_size']);
if ($buffer === false)
{
// unexpected error, client probably was disconnected
$keep_client_open = false;
$msg_to_server = "kill:$ppid\n";
Socket_Server::stdout('Broken Pipe. Sending IPC kill message to parent');
if (false === $child->ipc_write($msg_to_server))
{
Socket_Server::stdout('Error sending IPC message');
}
} else {
$command = trim($buffer);
}
if ($command == 'quit') {
$msg_to_server = 'kill:'.$child->pid."\n";
Socket_Server::stdout('Sending IPC kill message to parent');
if (false === $child->ipc_write($msg_to_server))
{
Socket_Server::stdout('Error sending IPC message');
}
//break;
} elseif ($command == 'shutdown') {
$msg = "You have started a shutdown!\n";
socket_write($client_socket->rs, $msg, strlen($msg));
Socket_Server::stdout('Shutdown server from client.');
$msg_to_server = "shutdown\n";
if (false === $child->ipc_write($msg_to_server))
{
Socket_Server::stdout('Error sending IPC message');
}
//$keep_client_open = false;
//break;
} elseif ($command == 'ping') {
$msg_to_server = "ping\n";
if (false === $child->ipc_write($msg_to_server))
{
Socket_Server::stdout('Error sending IPC