fluentd是一个非常好用的日志分发处理的程序,网站 http://www.fluentd.org/
使用官方的php程序自己改了一个单文件的Fluent的类库,并且加入了更多的支持,比如 tcp 方式支持 require_ack_response 了,这个参数主要是用来请求是否推送成功用的,避免因为网络问题没有推送成功而无法知道。
但是这个功能我实测如果每条log都去ask一下本来推送10w条记录只需要3-5秒,用这个后可能要几分钟时间,所以性能上会大打折扣。
所以我加入了add()方法,可以加入n多条log后一次性push到服务器进行1次ack,这样性能就会很好了。
另外,官网的php程序我在使用中发现在大量日志的推送后会突然出现“incoming chunk is broken”这样的错误,这个应该也是解决了,具体我再多测几天看看情况。
[2016-03-20] 更新http协议支持更多内容的提交
使用方法:
$option = array
(
'require_ack_response' => 1, //开启ACK
'max_buffer_length' => 2000 //每2000条add会自动push一次
);
$f = new Fluent('tcp://27x004.xd.com:2600', $option);
$t = microtime(1);
for($i = 1; $i<= 10000; $i++)
{
// 批量加入,到达2000条会自动推送
$f->add('test.abc', array('i'=>$i), time()); // 第3个time()参数可以不传
}
// 推送剩余的
var_dump($f->push('test.abc'));
echo "\n\n", microtime(1) - $t , "\n";
(
'require_ack_response' => 1, //开启ACK
'max_buffer_length' => 2000 //每2000条add会自动push一次
);
$f = new Fluent('tcp://27x004.xd.com:2600', $option);
$t = microtime(1);
for($i = 1; $i<= 10000; $i++)
{
// 批量加入,到达2000条会自动推送
$f->add('test.abc', array('i'=>$i), time()); // 第3个time()参数可以不传
}
// 推送剩余的
var_dump($f->push('test.abc'));
echo "\n\n", microtime(1) - $t , "\n";
下面是源代码
<?php
/**
* Fluent日志处理核心类
*
* 配置根目录 `$config['log']['fluent'] = 'tcp://127.0.0.1:24224/'` 后
* 使用 `self::log('myapp.test.debug', $_SERVER)` 默认就可以调用本方法
*
*
* Fluent::instance('tcp://127.0.0.1:24224/')->push('xd.game.test', ["test"=>"hello"]);
*
* Fluent::instance('unix:///full/path/to/my/socket.sock')->push('xd.game.test', ["test"=>"hello"]);
*
*
* @see https://github.com/fluent/fluent-logger-php
* @author 呼吸二氧化碳 <jonwang@myqee.com>
* @category Core
* @package Classes
* @copyright Copyright (c) 2008-2016 myqee.com
* @license http://www.myqee.com/license.html
*/
class Fluent
{
const CONNECTION_TIMEOUT = 3;
const SOCKET_TIMEOUT = 3;
const MAX_WRITE_RETRY = 5;
/* 1000 means 0.001 sec */
const USLEEP_WAIT = 1000;
/**
* 是否开启ACK
*
* @var bool
*/
const REQUIRE_ACK_RESPONSE = true;
/**
* backoff strategies: default usleep
*
* attempts | wait
* 1 | 0.003 sec
* 2 | 0.009 sec
* 3 | 0.027 sec
* 4 | 0.081 sec
* 5 | 0.243 sec
**/
const BACKOFF_TYPE_EXPONENTIAL = 0x01;
const BACKOFF_TYPE_USLEEP = 0x02;
/**
* 服务器
*
* 例如 `tcp://127.0.0.1:24224`
*
* @var string
*/
protected $transport;
/* @var resource */
protected $socket;
protected $is_http = false;
protected $data = [];
protected $options = array
(
'socket_timeout' => self::SOCKET_TIMEOUT,
'connection_timeout' => self::CONNECTION_TIMEOUT,
'backoff_mode' => self::BACKOFF_TYPE_USLEEP,
'backoff_base' => 3,
'usleep_wait' => self::USLEEP_WAIT,
'persistent' => true,
'retry_socket' => true,
'max_write_retry' => self::MAX_WRITE_RETRY,
'require_ack_response' => self::REQUIRE_ACK_RESPONSE,
'max_buffer_length' => 1000,
);
/**
* @var Fluent
*/
protected static $instance = array();
function __construct($server, array $option = array())
{
$this->transport = $server;
if (($pos = strpos($server, '://')) !== false)
{
$protocol = substr($server, 0, $pos);
if (!in_array($protocol, array('tcp', 'udp', 'unix', 'http')))
{
throw new Exception("transport `{$protocol}` does not support");
}
if ($protocol === 'http')
{
# 使用HTTP推送
$this->is_http = true;
$this->transport = rtrim($this->transport, '/ ');
}
}
else
{
throw new Exception("fluent config error");
}
if ($option)
{
$this->options = array_merge($this->options, $option);
}
}
/**
* destruct objects and socket.
*
* @return void
*/
public function __destruct()
{
if ($this->data)
{
# 把遗留的数据全部推送完毕
foreach (array_keys($this->data) as $tag)
{
$this->push($tag);
}
}
if (!$this->get_option('persistent', false) && is_resource($this->socket))
{
@fclose($this->socket);
}
}
/**
* 返回Fluent处理对象
*
* @return Fluent
*/
public static function instance($server)
{
if (!isset(Fluent::$instance[$server]))
{
Fluent::$instance[$server] = new Fluent($server);
}
return Fluent::$instance[$server];
}
/**
* 添加数据,添加完毕后并不直接推送
*
* 当开启ack后,推荐先批量 add 后再push,当超过 max_buffer_length 后会自动推送到服务器
*
* $fluent = new Fluent('tcp://127.0.0.1:24224/');
* $fluent->add('debug.test1', array('a' => 1));
* $fluent->add('debug.test2', array('a' => 2));
* $fluent->add('debug.test1', array('a' => 1));
*
* var_dump($fluent->push('debug.test1'));
* var_dump($fluent->push('debug.test2'));
*
* @param string $tag tag内容
* @param array $data 数据内容
* @param int $time 标记日志的时间戳,不设置就是当前时间
*/
public function add($tag, array $data, $time = null)
{
$this->_add($tag, $data, $time);
if (count($this->data[$tag]) >= $this->options['max_buffer_length'])
{
return $this->push($tag);
}
return true;
}
protected function _add($tag, $data, $time)
{
if ($this->is_http)
{
if (!isset($data['time']))
{
$data['time'] = $time ? $time : time();
}
$this->data[$tag][] = $data;
}
else
{
$this->data[$tag][] = array($time ? $time : time(), $data);
}
}
/**
* 推送数据到服务器
*
* @param string $tag tag内容
* @param array $data 数据内容
* @param int $time 标记日志的时间戳,不设置就是当前时间
* @return bool
* @throws Exception
*/
public function push($tag, $data = null, $time = null)
{
if ($data)
{
$this->_add($tag, $data, $time);
}
if (!isset($this->data[$tag]) || !$this->data[$tag])return true;
if ($this->is_http)
{
$rs = $this->push_with_http($tag, $time);
}
else
{
$rs = $this->push_with_socket($tag);
}
if ($rs)
{
unset($this->data[$tag]);
}
return $rs;
}
protected function push_with_http($tag, $time)
{
$packed = self::json_encode($this->data[$tag]);
$opts = array(
'http' => array
(
'method' => 'POST',
'content' => 'json='. urlencode($packed)
)
);
$context = stream_context_create($opts);
$url = $this->transport .'/'. $tag;
$ret = file_get_contents($url, false, $context);
return ($ret !== false && $ret === '');
}
protected function push_with_socket($tag)
{
$data = $this->data[$tag];
if ($ack = $this->get_option('require_ack_response'))
{
$ack_key = 'a'. (microtime(1) * 10000);
$buffer = $packed = self::json_encode(array($tag, $data, array('chunk' => $ack_key)));
}
else
{
$ack_key = null;
$buffer = $packed = self::json_encode(array($tag, $data));
}
$length = strlen($packed);
$retry = $written = 0;
try
{
$this->reconnect();
}
catch (Exception $e)
{
$this->close();
$this->process_error($tag, $data, $e->getMessage());
return false;
}
try
{
// PHP socket looks weired. we have to check the implementation.
while ($written < $length)
{
$nwrite = $this->write($buffer);
if ($nwrite === false)
{
// could not write messages to the socket.
// e.g) Resource temporarily unavailable
throw new Exception("could not write message");
}
else if ($nwrite === '')
{
// sometimes fwrite returns null string.
// probably connection aborted.
throw new Exception("connection aborted");
}
else if ($nwrite === 0)
{
if (!$this->get_option("retry_socket", true))
{
$this->process_error($tag, $data, "could not send entities");
return false;
}
if ($retry > $this->get_option("max_write_retry", self::MAX_WRITE_RETRY))
{
throw new Exception("failed fwrite retry: retry count exceeds limit.");
}
$errors = error_get_last();
if ($errors)
{
if (isset($errors['message']) && strpos($errors['message'], 'errno=32 ') !== false)
{
/* breaking pipes: we have to close socket manually */
$this->close();
$this->reconnect();
# 断开后重新连上后从头开始写,避免出现 incoming chunk is broken 的错误问题
$written = 0;
$buffer = $packed;
continue;
}
else if (isset($errors['message']) && strpos($errors['message'], 'errno=11 ') !== false)
{
// we can ignore EAGAIN message. just retry.
}
else
{
error_log("unhandled error detected. please report this issue to http://github.com/fluent/fluent-logger-php/issues: ". var_export($errors, true));
}
}
if ($this->get_option('backoff_mode', self::BACKOFF_TYPE_EXPONENTIAL) == self::BACKOFF_TYPE_EXPONENTIAL)
{
$this->backoff_exponential(3, $retry);
}
else
{
usleep($this->get_option("usleep_wait", self::USLEEP_WAIT));
}
$retry++;
continue;
}
$written += $nwrite;
$buffer = substr($packed, $written);
}
if ($ack)
{
$rs = @fread($this->socket, 25);
if ($rs)
{
$rs = @json_decode($rs, true);
if ($rs && isset($rs['ack']))
{
if ($rs['ack'] !== $ack_key)
{
$this->process_error($tag, $data, 'ack in response and chunk id in sent data are different');
return false;
}
else
{
return true;
}
}
else
{
return false;
}
}
else
{
return false;
}
}
}
catch (Exception $e)
{
$this->close();
$this->process_error($tag, $data, $e->getMessage());
return false;
}
return true;
}
/**
* write data
*
* @param string $data
* @return mixed integer|false
*/
protected function write($buffer)
{
// We handle fwrite error on postImpl block. ignore error message here.
return @fwrite($this->socket, $buffer);
}
/**
* create a connection to specified fluentd
*
* @throws \Exception
*/
protected function connect()
{
$connect_options = STREAM_CLIENT_CONNECT;
if ($this->get_option("persistent", false))
{
$connect_options |= STREAM_CLIENT_PERSISTENT;
}
// could not suppress warning without ini setting.
// for now, we use error control operators.
$socket = @stream_socket_client($this->transport, $errno, $errstr, $this->get_option("connection_timeout", self::CONNECTION_TIMEOUT), $connect_options);
if (!$socket)
{
$errors = error_get_last();
throw new Exception($errors['message']);
}
// set read / write timeout.
stream_set_timeout($socket, $this->get_option("socket_timeout", self::SOCKET_TIMEOUT));
$this->socket = $socket;
}
/**
* create a connection if Fluent Logger hasn't a socket connection.
*
* @return void
*/
protected function reconnect()
{
if (!is_resource($this->socket))
{
$this->connect();
}
}
/**
* close the socket
*
* @return void
*/
public function close()
{
if (is_resource($this->socket))
{
fclose($this->socket);
}
$this->socket = null;
}
/**
* get specified option's value
*
* @param $key
* @param null $default
* @return mixed
*/
protected function get_option($key, $default = null)
{
$result = $default;
if (isset($this->options[$key]))
{
$result = $this->options[$key];
}
return $result;
}
/**
* backoff exponential sleep
*
* @param $base int
* @param $attempt int
*/
protected function backoff_exponential($base, $attempt)
{
usleep(pow($base, $attempt) * 1000);
}
/**
* 处理错误
*
* @param $tag
* @param $data
* @param $error
*/
protected function process_error($tag, $data, $error)
{
error_log(sprintf("%s %s: %s", $error, $tag, json_encode($data)));
}
protected static function json_encode(array $data)
{
try
{
// 解决使用 JSON_UNESCAPED_UNICODE 偶尔会出现编码问题导致json报错
return defined('JSON_UNESCAPED_UNICODE') ? json_encode($data, JSON_UNESCAPED_UNICODE) : json_encode($data);
}
catch (Exception $e)
{
return json_encode($data);
}
}
}
/**
* Fluent日志处理核心类
*
* 配置根目录 `$config['log']['fluent'] = 'tcp://127.0.0.1:24224/'` 后
* 使用 `self::log('myapp.test.debug', $_SERVER)` 默认就可以调用本方法
*
*
* Fluent::instance('tcp://127.0.0.1:24224/')->push('xd.game.test', ["test"=>"hello"]);
*
* Fluent::instance('unix:///full/path/to/my/socket.sock')->push('xd.game.test', ["test"=>"hello"]);
*
*
* @see https://github.com/fluent/fluent-logger-php
* @author 呼吸二氧化碳 <jonwang@myqee.com>
* @category Core
* @package Classes
* @copyright Copyright (c) 2008-2016 myqee.com
* @license http://www.myqee.com/license.html
*/
class Fluent
{
const CONNECTION_TIMEOUT = 3;
const SOCKET_TIMEOUT = 3;
const MAX_WRITE_RETRY = 5;
/* 1000 means 0.001 sec */
const USLEEP_WAIT = 1000;
/**
* 是否开启ACK
*
* @var bool
*/
const REQUIRE_ACK_RESPONSE = true;
/**
* backoff strategies: default usleep
*
* attempts | wait
* 1 | 0.003 sec
* 2 | 0.009 sec
* 3 | 0.027 sec
* 4 | 0.081 sec
* 5 | 0.243 sec
**/
const BACKOFF_TYPE_EXPONENTIAL = 0x01;
const BACKOFF_TYPE_USLEEP = 0x02;
/**
* 服务器
*
* 例如 `tcp://127.0.0.1:24224`
*
* @var string
*/
protected $transport;
/* @var resource */
protected $socket;
protected $is_http = false;
protected $data = [];
protected $options = array
(
'socket_timeout' => self::SOCKET_TIMEOUT,
'connection_timeout' => self::CONNECTION_TIMEOUT,
'backoff_mode' => self::BACKOFF_TYPE_USLEEP,
'backoff_base' => 3,
'usleep_wait' => self::USLEEP_WAIT,
'persistent' => true,
'retry_socket' => true,
'max_write_retry' => self::MAX_WRITE_RETRY,
'require_ack_response' => self::REQUIRE_ACK_RESPONSE,
'max_buffer_length' => 1000,
);
/**
* @var Fluent
*/
protected static $instance = array();
function __construct($server, array $option = array())
{
$this->transport = $server;
if (($pos = strpos($server, '://')) !== false)
{
$protocol = substr($server, 0, $pos);
if (!in_array($protocol, array('tcp', 'udp', 'unix', 'http')))
{
throw new Exception("transport `{$protocol}` does not support");
}
if ($protocol === 'http')
{
# 使用HTTP推送
$this->is_http = true;
$this->transport = rtrim($this->transport, '/ ');
}
}
else
{
throw new Exception("fluent config error");
}
if ($option)
{
$this->options = array_merge($this->options, $option);
}
}
/**
* destruct objects and socket.
*
* @return void
*/
public function __destruct()
{
if ($this->data)
{
# 把遗留的数据全部推送完毕
foreach (array_keys($this->data) as $tag)
{
$this->push($tag);
}
}
if (!$this->get_option('persistent', false) && is_resource($this->socket))
{
@fclose($this->socket);
}
}
/**
* 返回Fluent处理对象
*
* @return Fluent
*/
public static function instance($server)
{
if (!isset(Fluent::$instance[$server]))
{
Fluent::$instance[$server] = new Fluent($server);
}
return Fluent::$instance[$server];
}
/**
* 添加数据,添加完毕后并不直接推送
*
* 当开启ack后,推荐先批量 add 后再push,当超过 max_buffer_length 后会自动推送到服务器
*
* $fluent = new Fluent('tcp://127.0.0.1:24224/');
* $fluent->add('debug.test1', array('a' => 1));
* $fluent->add('debug.test2', array('a' => 2));
* $fluent->add('debug.test1', array('a' => 1));
*
* var_dump($fluent->push('debug.test1'));
* var_dump($fluent->push('debug.test2'));
*
* @param string $tag tag内容
* @param array $data 数据内容
* @param int $time 标记日志的时间戳,不设置就是当前时间
*/
public function add($tag, array $data, $time = null)
{
$this->_add($tag, $data, $time);
if (count($this->data[$tag]) >= $this->options['max_buffer_length'])
{
return $this->push($tag);
}
return true;
}
protected function _add($tag, $data, $time)
{
if ($this->is_http)
{
if (!isset($data['time']))
{
$data['time'] = $time ? $time : time();
}
$this->data[$tag][] = $data;
}
else
{
$this->data[$tag][] = array($time ? $time : time(), $data);
}
}
/**
* 推送数据到服务器
*
* @param string $tag tag内容
* @param array $data 数据内容
* @param int $time 标记日志的时间戳,不设置就是当前时间
* @return bool
* @throws Exception
*/
public function push($tag, $data = null, $time = null)
{
if ($data)
{
$this->_add($tag, $data, $time);
}
if (!isset($this->data[$tag]) || !$this->data[$tag])return true;
if ($this->is_http)
{
$rs = $this->push_with_http($tag, $time);
}
else
{
$rs = $this->push_with_socket($tag);
}
if ($rs)
{
unset($this->data[$tag]);
}
return $rs;
}
protected function push_with_http($tag, $time)
{
$packed = self::json_encode($this->data[$tag]);
$opts = array(
'http' => array
(
'method' => 'POST',
'content' => 'json='. urlencode($packed)
)
);
$context = stream_context_create($opts);
$url = $this->transport .'/'. $tag;
$ret = file_get_contents($url, false, $context);
return ($ret !== false && $ret === '');
}
protected function push_with_socket($tag)
{
$data = $this->data[$tag];
if ($ack = $this->get_option('require_ack_response'))
{
$ack_key = 'a'. (microtime(1) * 10000);
$buffer = $packed = self::json_encode(array($tag, $data, array('chunk' => $ack_key)));
}
else
{
$ack_key = null;
$buffer = $packed = self::json_encode(array($tag, $data));
}
$length = strlen($packed);
$retry = $written = 0;
try
{
$this->reconnect();
}
catch (Exception $e)
{
$this->close();
$this->process_error($tag, $data, $e->getMessage());
return false;
}
try
{
// PHP socket looks weired. we have to check the implementation.
while ($written < $length)
{
$nwrite = $this->write($buffer);
if ($nwrite === false)
{
// could not write messages to the socket.
// e.g) Resource temporarily unavailable
throw new Exception("could not write message");
}
else if ($nwrite === '')
{
// sometimes fwrite returns null string.
// probably connection aborted.
throw new Exception("connection aborted");
}
else if ($nwrite === 0)
{
if (!$this->get_option("retry_socket", true))
{
$this->process_error($tag, $data, "could not send entities");
return false;
}
if ($retry > $this->get_option("max_write_retry", self::MAX_WRITE_RETRY))
{
throw new Exception("failed fwrite retry: retry count exceeds limit.");
}
$errors = error_get_last();
if ($errors)
{
if (isset($errors['message']) && strpos($errors['message'], 'errno=32 ') !== false)
{
/* breaking pipes: we have to close socket manually */
$this->close();
$this->reconnect();
# 断开后重新连上后从头开始写,避免出现 incoming chunk is broken 的错误问题
$written = 0;
$buffer = $packed;
continue;
}
else if (isset($errors['message']) && strpos($errors['message'], 'errno=11 ') !== false)
{
// we can ignore EAGAIN message. just retry.
}
else
{
error_log("unhandled error detected. please report this issue to http://github.com/fluent/fluent-logger-php/issues: ". var_export($errors, true));
}
}
if ($this->get_option('backoff_mode', self::BACKOFF_TYPE_EXPONENTIAL) == self::BACKOFF_TYPE_EXPONENTIAL)
{
$this->backoff_exponential(3, $retry);
}
else
{
usleep($this->get_option("usleep_wait", self::USLEEP_WAIT));
}
$retry++;
continue;
}
$written += $nwrite;
$buffer = substr($packed, $written);
}
if ($ack)
{
$rs = @fread($this->socket, 25);
if ($rs)
{
$rs = @json_decode($rs, true);
if ($rs && isset($rs['ack']))
{
if ($rs['ack'] !== $ack_key)
{
$this->process_error($tag, $data, 'ack in response and chunk id in sent data are different');
return false;
}
else
{
return true;
}
}
else
{
return false;
}
}
else
{
return false;
}
}
}
catch (Exception $e)
{
$this->close();
$this->process_error($tag, $data, $e->getMessage());
return false;
}
return true;
}
/**
* write data
*
* @param string $data
* @return mixed integer|false
*/
protected function write($buffer)
{
// We handle fwrite error on postImpl block. ignore error message here.
return @fwrite($this->socket, $buffer);
}
/**
* create a connection to specified fluentd
*
* @throws \Exception
*/
protected function connect()
{
$connect_options = STREAM_CLIENT_CONNECT;
if ($this->get_option("persistent", false))
{
$connect_options |= STREAM_CLIENT_PERSISTENT;
}
// could not suppress warning without ini setting.
// for now, we use error control operators.
$socket = @stream_socket_client($this->transport, $errno, $errstr, $this->get_option("connection_timeout", self::CONNECTION_TIMEOUT), $connect_options);
if (!$socket)
{
$errors = error_get_last();
throw new Exception($errors['message']);
}
// set read / write timeout.
stream_set_timeout($socket, $this->get_option("socket_timeout", self::SOCKET_TIMEOUT));
$this->socket = $socket;
}
/**
* create a connection if Fluent Logger hasn't a socket connection.
*
* @return void
*/
protected function reconnect()
{
if (!is_resource($this->socket))
{
$this->connect();
}
}
/**
* close the socket
*
* @return void
*/
public function close()
{
if (is_resource($this->socket))
{
fclose($this->socket);
}
$this->socket = null;
}
/**
* get specified option's value
*
* @param $key
* @param null $default
* @return mixed
*/
protected function get_option($key, $default = null)
{
$result = $default;
if (isset($this->options[$key]))
{
$result = $this->options[$key];
}
return $result;
}
/**
* backoff exponential sleep
*
* @param $base int
* @param $attempt int
*/
protected function backoff_exponential($base, $attempt)
{
usleep(pow($base, $attempt) * 1000);
}
/**
* 处理错误
*
* @param $tag
* @param $data
* @param $error
*/
protected function process_error($tag, $data, $error)
{
error_log(sprintf("%s %s: %s", $error, $tag, json_encode($data)));
}
protected static function json_encode(array $data)
{
try
{
// 解决使用 JSON_UNESCAPED_UNICODE 偶尔会出现编码问题导致json报错
return defined('JSON_UNESCAPED_UNICODE') ? json_encode($data, JSON_UNESCAPED_UNICODE) : json_encode($data);
}
catch (Exception $e)
{
return json_encode($data);
}
}
}