fluentd是一个非常好用的日志分发处理的程序,网站 http://www.fluentd.org/

使用官方的php程序自己改了一个单文件的Fluent的类库,并且加入了更多的支持,比如 tcp 方式支持 require_ack_response 了,这个参数主要是用来请求是否推送成功用的,避免因为网络问题没有推送成功而无法知道。
但是这个功能我实测如果每条log都去ask一下本来推送10w条记录只需要3-5秒,用这个后可能要几分钟时间,所以性能上会大打折扣。
所以我加入了add()方法,可以加入n多条log后一次性push到服务器进行1次ack,这样性能就会很好了。
另外,官网的php程序我在使用中发现在大量日志的推送后会突然出现“incoming chunk is broken”这样的错误,这个应该也是解决了,具体我再多测几天看看情况。

[2016-03-20] 更新http协议支持更多内容的提交

使用方法:

$option = array
(
    'require_ack_response' => 1,    //开启ACK
    'max_buffer_length' => 2000     //每2000条add会自动push一次
);

$f = new Fluent('tcp://27x004.xd.com:2600', $option);
$t = microtime(1);
for($i = 1; $i<= 10000; $i++)
{
    // 批量加入,到达2000条会自动推送
    $f->add('test.abc', array('i'=>$i), time());  // 第3个time()参数可以不传
}
// 推送剩余的
var_dump($f->push('test.abc'));

echo "\n\n", microtime(1) - $t , "\n";

下面是源代码

<?php
/**
 * Fluent日志处理核心类
 *
 * 配置根目录 `$config['log']['fluent'] = 'tcp://127.0.0.1:24224/'` 后
 * 使用 `self::log('myapp.test.debug', $_SERVER)` 默认就可以调用本方法
 *
 *
 *      Fluent::instance('tcp://127.0.0.1:24224/')->push('xd.game.test', ["test"=>"hello"]);
 *
 *      Fluent::instance('unix:///full/path/to/my/socket.sock')->push('xd.game.test', ["test"=>"hello"]);
 *
 *
 * @see        https://github.com/fluent/fluent-logger-php
 * @author     呼吸二氧化碳 <jonwang@myqee.com>
 * @category   Core
 * @package    Classes
 * @copyright  Copyright (c) 2008-2016 myqee.com
 * @license    http://www.myqee.com/license.html
 */

class Fluent
{
    const CONNECTION_TIMEOUT = 3;
    const SOCKET_TIMEOUT     = 3;
    const MAX_WRITE_RETRY    = 5;

    /* 1000 means 0.001 sec */
    const USLEEP_WAIT = 1000;

    /**
     * 是否开启ACK
     *
     * @var bool
     */

    const REQUIRE_ACK_RESPONSE = true;

    /**
     * backoff strategies: default usleep
     *
     * attempts | wait
     * 1        | 0.003 sec
     * 2        | 0.009 sec
     * 3        | 0.027 sec
     * 4        | 0.081 sec
     * 5        | 0.243 sec
     **/

    const BACKOFF_TYPE_EXPONENTIAL = 0x01;
    const BACKOFF_TYPE_USLEEP      = 0x02;

    /**
     * 服务器
     *
     * 例如 `tcp://127.0.0.1:24224`
     *
     * @var string
     */

    protected $transport;

    /* @var resource */
    protected $socket;

    protected $is_http = false;

    protected $data = [];

    protected $options = array
    (
        'socket_timeout'       => self::SOCKET_TIMEOUT,
        'connection_timeout'   => self::CONNECTION_TIMEOUT,
        'backoff_mode'         => self::BACKOFF_TYPE_USLEEP,
        'backoff_base'         => 3,
        'usleep_wait'          => self::USLEEP_WAIT,
        'persistent'           => true,
        'retry_socket'         => true,
        'max_write_retry'      => self::MAX_WRITE_RETRY,
        'require_ack_response' => self::REQUIRE_ACK_RESPONSE,
        'max_buffer_length'    => 1000,
    );

    /**
     * @var Fluent
     */

    protected static $instance = array();

    function __construct($server, array $option = array())
    {
        $this->transport = $server;

        if (($pos = strpos($server, '://')) !== false)
        {
            $protocol = substr($server, 0, $pos);

            if (!in_array($protocol, array('tcp', 'udp', 'unix', 'http')))
            {
                throw new Exception("transport `{$protocol}` does not support");
            }

            if ($protocol === 'http')
            {
                # 使用HTTP推送
              $this->is_http = true;
                $this->transport = rtrim($this->transport, '/ ');
            }
        }
        else
        {
            throw new Exception("fluent config error");
        }

        if ($option)
        {
            $this->options = array_merge($this->options, $option);
        }
    }

    /**
     * destruct objects and socket.
     *
     * @return void
     */

    public function __destruct()
    {
        if ($this->data)
        {
            # 把遗留的数据全部推送完毕
          foreach (array_keys($this->data) as $tag)
            {
                $this->push($tag);
            }
        }

        if (!$this->get_option('persistent', false) && is_resource($this->socket))
        {
            @fclose($this->socket);
        }
    }

    /**
     * 返回Fluent处理对象
     *
     * @return Fluent
     */

    public static function instance($server)
    {
        if (!isset(Fluent::$instance[$server]))
        {
            Fluent::$instance[$server] = new Fluent($server);
        }

        return Fluent::$instance[$server];
    }

    /**
     * 添加数据,添加完毕后并不直接推送
     *
     * 当开启ack后,推荐先批量 add 后再push,当超过 max_buffer_length 后会自动推送到服务器
     *
     *      $fluent = new Fluent('tcp://127.0.0.1:24224/');
     *      $fluent->add('debug.test1', array('a' => 1));
     *      $fluent->add('debug.test2', array('a' => 2));
     *      $fluent->add('debug.test1', array('a' => 1));
     *
     *      var_dump($fluent->push('debug.test1'));
     *      var_dump($fluent->push('debug.test2'));
     *
     * @param string $tag tag内容
     * @param array $data 数据内容
     * @param int $time 标记日志的时间戳,不设置就是当前时间
     */

    public function add($tag, array $data, $time = null)
    {
        $this->_add($tag, $data, $time);

        if (count($this->data[$tag]) >= $this->options['max_buffer_length'])
        {
            return $this->push($tag);
        }

        return true;
    }

    protected function _add($tag, $data, $time)
    {
        if ($this->is_http)
        {
            if (!isset($data['time']))
            {
                $data['time'] = $time ? $time : time();
            }
            $this->data[$tag][] = $data;
        }
        else
        {
            $this->data[$tag][] = array($time ? $time : time(), $data);
        }
    }

    /**
     * 推送数据到服务器
     *
     * @param string $tag tag内容
     * @param array $data 数据内容
     * @param int $time 标记日志的时间戳,不设置就是当前时间
     * @return bool
     * @throws Exception
     */

    public function push($tag, $data = null, $time = null)
    {
        if ($data)
        {
            $this->_add($tag, $data, $time);
        }

        if (!isset($this->data[$tag]) || !$this->data[$tag])return true;

        if ($this->is_http)
        {
            $rs = $this->push_with_http($tag, $time);
        }
        else
        {
            $rs = $this->push_with_socket($tag);
        }

        if ($rs)
        {
            unset($this->data[$tag]);
        }

        return $rs;
    }

    protected function push_with_http($tag, $time)
    {
        $packed  = self::json_encode($this->data[$tag]);
        $opts = array(
            'http' => array
            (
                'method'  => 'POST',
                'content' => 'json='. urlencode($packed)
            )
        );
        $context = stream_context_create($opts);
        $url     = $this->transport .'/'. $tag;

        $ret = file_get_contents($url, false, $context);

        return ($ret !== false && $ret === '');
    }

    protected function push_with_socket($tag)
    {
        $data = $this->data[$tag];
       
        if ($ack = $this->get_option('require_ack_response'))
        {
            $ack_key = 'a'. (microtime(1) * 10000);
            $buffer = $packed = self::json_encode(array($tag, $data, array('chunk' => $ack_key)));
        }
        else
        {
            $ack_key = null;
            $buffer = $packed = self::json_encode(array($tag, $data));
        }

        $length = strlen($packed);
        $retry  = $written = 0;

        try
        {
            $this->reconnect();
        }
        catch (Exception $e)
        {
            $this->close();
            $this->process_error($tag, $data, $e->getMessage());

            return false;
        }

        try
        {
            // PHP socket looks weired. we have to check the implementation.
            while ($written < $length)
            {
                $nwrite = $this->write($buffer);

                if ($nwrite === false)
                {
                    // could not write messages to the socket.
                    // e.g) Resource temporarily unavailable
                    throw new Exception("could not write message");
                }
                else if ($nwrite === '')
                {
                    // sometimes fwrite returns null string.
                    // probably connection aborted.
                    throw new Exception("connection aborted");
                }
                else if ($nwrite === 0)
                {
                    if (!$this->get_option("retry_socket", true))
                    {
                        $this->process_error($tag, $data, "could not send entities");

                        return false;
                    }

                    if ($retry > $this->get_option("max_write_retry", self::MAX_WRITE_RETRY))
                    {
                        throw new Exception("failed fwrite retry: retry count exceeds limit.");
                    }

                    $errors = error_get_last();
                    if ($errors)
                    {
                        if (isset($errors['message']) && strpos($errors['message'], 'errno=32 ') !== false)
                        {
                            /* breaking pipes: we have to close socket manually */
                            $this->close();
                            $this->reconnect();

                            # 断开后重新连上后从头开始写,避免出现 incoming chunk is broken 的错误问题
                          $written = 0;
                            $buffer = $packed;
                            continue;
                        }
                        else if (isset($errors['message']) && strpos($errors['message'], 'errno=11 ') !== false)
                        {
                            // we can ignore EAGAIN message. just retry.
                        }
                        else
                        {
                            error_log("unhandled error detected. please report this issue to http://github.com/fluent/fluent-logger-php/issues: ". var_export($errors, true));
                        }
                    }

                    if ($this->get_option('backoff_mode', self::BACKOFF_TYPE_EXPONENTIAL) == self::BACKOFF_TYPE_EXPONENTIAL)
                    {
                        $this->backoff_exponential(3, $retry);
                    }
                    else
                    {
                        usleep($this->get_option("usleep_wait", self::USLEEP_WAIT));
                    }
                    $retry++;
                    continue;
                }

                $written += $nwrite;
                $buffer   = substr($packed, $written);
            }

            if ($ack)
            {
                $rs = @fread($this->socket, 25);
                if ($rs)
                {
                    $rs = @json_decode($rs, true);
                    if ($rs && isset($rs['ack']))
                    {
                        if ($rs['ack'] !== $ack_key)
                        {
                            $this->process_error($tag, $data, 'ack in response and chunk id in sent data are different');
                            return false;
                        }
                        else
                        {
                            return true;
                        }
                    }
                    else
                    {
                        return false;
                    }
                }
                else
                {
                    return false;
                }
            }
        }
        catch (Exception $e)
        {
            $this->close();
            $this->process_error($tag, $data, $e->getMessage());

            return false;
        }

        return true;
    }


    /**
     * write data
     *
     * @param string $data
     * @return mixed integer|false
     */

    protected function write($buffer)
    {
        // We handle fwrite error on postImpl block. ignore error message here.
        return @fwrite($this->socket, $buffer);
    }

    /**
     * create a connection to specified fluentd
     *
     * @throws \Exception
     */

    protected function connect()
    {
        $connect_options = STREAM_CLIENT_CONNECT;
        if ($this->get_option("persistent", false))
        {
            $connect_options |= STREAM_CLIENT_PERSISTENT;
        }

        // could not suppress warning without ini setting.
        // for now, we use error control operators.
        $socket = @stream_socket_client($this->transport, $errno, $errstr, $this->get_option("connection_timeout", self::CONNECTION_TIMEOUT), $connect_options);

        if (!$socket)
        {
            $errors = error_get_last();
            throw new Exception($errors['message']);
        }

        // set read / write timeout.
        stream_set_timeout($socket, $this->get_option("socket_timeout", self::SOCKET_TIMEOUT));

        $this->socket = $socket;
    }

    /**
     * create a connection if Fluent Logger hasn't a socket connection.
     *
     * @return void
     */

    protected function reconnect()
    {
        if (!is_resource($this->socket))
        {
            $this->connect();
        }
    }

    /**
     * close the socket
     *
     * @return void
     */

    public function close()
    {
        if (is_resource($this->socket))
        {
            fclose($this->socket);
        }

        $this->socket = null;
    }

    /**
     * get specified option's value
     *
     * @param      $key
     * @param null $default
     * @return mixed
     */

    protected function get_option($key, $default = null)
    {
        $result = $default;
        if (isset($this->options[$key]))
        {
            $result = $this->options[$key];
        }

        return $result;
    }

    /**
     * backoff exponential sleep
     *
     * @param $base int
     * @param $attempt int
     */

    protected function backoff_exponential($base, $attempt)
    {
        usleep(pow($base, $attempt) * 1000);
    }

    /**
     * 处理错误
     *
     * @param $tag
     * @param $data
     * @param $error
     */

    protected function process_error($tag, $data, $error)
    {
        error_log(sprintf("%s %s: %s", $error, $tag, json_encode($data)));
    }

    protected static function json_encode(array $data)
    {
        try
        {
            // 解决使用 JSON_UNESCAPED_UNICODE 偶尔会出现编码问题导致json报错
            return defined('JSON_UNESCAPED_UNICODE') ? json_encode($data, JSON_UNESCAPED_UNICODE) : json_encode($data);
        }
        catch (Exception $e)
        {
            return json_encode($data);
        }
    }
}