利用 Grafana influxDB fluentd pidstat 对指定进程进行监控

有图有真相,先来看几张成果图:

QQ20160407-1@2x

QQ20160407-0@2x

我先介绍下这些东西都是做什么的

  • Grafana 是一个非常漂亮的可以自定义的监控web服务,如上图
  • InfluxDB 是一个高效的时间数据库,擅长用于记录按时间推进的数据,这里的数据都存在它里面
  • Fluentd 是一个数据(日志)接受和分发的服务,可以指定任意输入源后经过适当的处理后再分发到其它的接收端,这里收集数据都是通过它进入到 InfluxDB 的
  • pidstat 直接安装 sysstat 就有了,可以查看进程相关数据
【安装】

这里不赘述,只简单的说下(其实都很简单)

 

确定以上程序都安装好后,接下来就是进行配置了,Grafana 和 InfluxDB 实际上也没什么要配置的,装好后直接启动吧,启动命令

systemctl start influxd
systemctl start grafana-server

开启开机自动启动:

systemctl enable influxd.service
systemctl enable grafana-server.service
【InfluxDB 设置】

启动后,就可以通过 http://serverip:8083/ 访问在线的InfluxDB管理功能了。
在Query栏里输入:

CREATE USER "grafana" WITH PASSWORD 'grafana'
CREATE DATABASE grafana WITH DURATION 2d REPLICATION 5

第一行是创建一个 grafana 的帐号和密码
第二行是创建一个 grafana 库,并且默认保留数据2d是2天(m表示月)的意思,可以自行设置,REPLICATION 5 是分5个片,如果想永久保留把 DURATION 2d 去掉就可以了,这些后期都可以修改

【Grafana 设置】

http://serverip:3000/ 是 Grafana 的管理界面。

进入 Grafana 页面(默认帐号是 admin/admin)点击 Data Sources 连接,在上面点击 Add new 连接,type选择 InfluxDB0.9.x(虽然我装的是0.12但是只有这个,选这个可以用)URL 填 http://serverip:8086/ 这个 8086 端口就是 influxdb 的默认http接口端口。注意,Access 要选择 proxy。
下面的 InfluxDB Details 全部填 grafana 就好了。

【Fluentd 设置】

InfluxDB 的源添加好了,现在开始收集数据吧。收集数据我用到了 fluentd 进行分发。默认装好后就有

td-agent

命令了,此时,需要安装2个插件:

td-agent-gem install fluent-plugin-influxdb
td-agent-gem install fluent-plugin-rewrite-tag-filter

安装成功后,将下面我修改的influx插件文件放在 /etc/td-agent/plugin/out_influx.rb 文件中(是用官方的改的,解决了些细节问题)

# encoding: UTF-8
require 'date'
require 'influxdb'

class Fluent::InfluxOutput < Fluent::BufferedOutput
  Fluent::Plugin.register_output('influx', self)

  include Fluent::HandleTagNameMixin

  config_param :host, :string,  :default => 'localhost',
               :desc => "The IP or domain of influxDB."
  config_param :port, :integer,  :default => 8086,
               :desc => "The HTTP port of influxDB."
  config_param :dbname, :string,  :default => 'fluentd',
               :desc => <<-DESC
The database name of influxDB.
You should create the database and grant permissions at first.
DESC
  config_param :user, :string,  :default => 'root',
               :desc => "The DB user of influxDB, should be created manually."
  config_param :password, :string,  :default => 'root', :secret => true,
               :desc => "The password of the user."
  config_param :time_precision, :string, :default => 's',
               :desc => <<-DESC
The time precision of timestamp.
You should specify either hour (h), minutes (m), second (s),
millisecond (ms), microsecond (u), or nanosecond (n).
DESC
  config_param :use_ssl, :bool, :default => false,
               :desc => "Use SSL when connecting to influxDB."
  config_param :tag_keys, :array, :default => [],
               :desc => "The names of the keys to use as influxDB tags."
  config_param :sequence_tag, :string, :default => nil,
               :desc => <<-DESC
The name of the tag whose value is incremented for the consecutive simultaneous
events and reset to zero for a new event with the different timestamp.
DESC
  config_param :uniqu, :bool, :default => true


  def initialize
    super
    @seq = 0
  end

  def configure(conf)
    super
    @uniqu = conf['uniqu']
    @influxdb = InfluxDB::Client.new @dbname, host: @host,
                                              port: @port,
                                              username: @user,
                                              password: @password,
                                              async: false,
                                              time_precision: "n",
                                              # time_precision: @time_precision,
                                              use_ssl: @use_ssl
  end

  def start
    super
  end

  FORMATTED_RESULT_FOR_INVALID_RECORD = ''.freeze

  def format(tag, time, record)
    # TODO: Use tag based chunk separation for more reliability
    if record.empty? || record.has_value?(nil)
      FORMATTED_RESULT_FOR_INVALID_RECORD
    else
      [tag, time, record].to_msgpack
    end
  end

  def shutdown
    super
  end

  def write(chunk)
    points = []

    # 自动调回0
    if @seq > 1000000
      @seq = 0
    end

    chunk.msgpack_each do |tag, time, record|
      timestamp = record.delete('time') || time

      # 使用参数里的毫秒
      if record["microtime"] && record["microtime"] > timestamp
        timestamp = record["microtime"]
      end

      if @uniqu
        timestamp = (timestamp * 1000000000).to_i
      else
        @seq += 1
        # 这里强制 time_precision 参数设定成 n, 所以乘以1000000000
        timestamp = (timestamp * 1000000000).to_i + @seq
      end

      if tag_keys.empty?
        values = record
        tags = {}
      else
        values = {}
        tags = {}
        record.each_pair do |k, v|
          if @tag_keys.include?(k)
            tags[k] = v
          else
            values[k] = v
          end
        end
      end

      point = {
        :timestamp => timestamp,
        :series    => tag,
        :values    => values,
        :tags      => tags,
      }
      points << point
    end

    @influxdb.write_points(points)
  end
end

插件保存后,添加一个 td-agent(fluentd)的配置,将以下文件放在

/etc/td-agent/monitor-collect.conf

文件中

# CPU
<source>
  type tcp
  port 5170
  tag collect.log
  source_host_key client
  types uid:integer,pid:integer,usr:float,system:float,guest:float,cpu:float,cpu_num:integer
  format /^(?<time>[^ ]+ [P|A]M)[ ]+(?<uid>\d+)?[ ]*(?<pid>\d+)[ ]+(?<usr>[^ ]+)[ ]+(?<system>[^ ]+)[ ]+(?<guest>[^ ]*)[ ]+(?<cpu>[^ ]*)[ ]+(?<cpu_num>[\d]+)[ ]+(?<command>.*)$/
</source>

# 内存
<source>
  type tcp
  port 5171
  tag collect.log
  source_host_key client
  types uid:integer,pid:integer,minflt:float,majflt:float,vsz:integer,rss:integer,mem:float
  format /^(?<time>[^ ]+ [P|A]M)[ ]+(?<uid>\d+)?[ ]*(?<pid>\d+)[ ]+(?<minflt>[^ ]+)[ ]+(?<majflt>[^ ]+)[ ]+(?<vsz>[^ ]+)[ ]+(?<rss>[^ ]+)[ ]+(?<mem>[^ ]+)[ ]+(?<command>.*)$/
</source>

# IO
<source>
  type tcp
  port 5172
  tag collect.log
  source_host_key client
  types uid:integer,pid:integer,kprd:float,kbwr:float,kbccwr:float
  format /^(?<time>[^ ]+ [P|A]M)[ ]+(?<uid>\d+)?[ ]*(?<pid>\d+)[ ]+(?<kprd>[^ ]+)[ ]+(?<kbwr>[^ ]+)[ ]+(?<kbccwr>[^ ]+)[ ]+(?<command>.*)$/
</source>


<match collect.log>
  type rewrite_tag_filter
  rewriterule1 command ^influxd$ monitor.influxd
  rewriterule2 command ^java$ monitor.elasticsearch
  rewriterule3 command ^td-agent$ monitor.td-agent
  rewriterule4 command ^td-agent-(.*)$ monitor.td-agent-$1
  rewriterule5 command ^([^.]+)$ monitor.$1
</match>


<match monitor.*>
  type influx
  host 10.1.37.3
  port 8086
  dbname grafana
  user influxdb
  password influxdb
  remove_tag_prefix monitor.
  flush_interval 1
  tag_keys ["pid", "parent_pid", "command", "client"]
  uniqu false

  retry_wait 3
  buffer_type memory
  buffer_queue_limit 10
  buffer_chunk_limit 1m
  retry_limit 5
</match>

然后使用

td-agent -c /etc/td-agent/monitor-collect.conf

启动即可,如果需要放后台启动可以加 -d 参数,如果需要记录log,则可以加参数 -o logpath.log 完整的如下:

td-agent -d -c /etc/td-agent/monitor-collect.conf -o /var/log/td-agent/monitor-collsole.log

它监听了3个tcp端口,分别用来收集cpu,内存,io的。

tail -f /var/log/td-agent/monitor-collect.log

看看是否成功启动了

【收集指定程序的数据】

成功启动后,接下来就是去需要收集进程的其它服务器上操作了,这边需要用到我写的另外一个bash文件,文件保存在

/usr/local/bin/monitor-collect.sh

文件里(如果是别的文件名,则请修改下代码里的 “monitor-collect.sh” 为同名文件),另外,注意修改第4行的IP为 Fluentd 运行的服务器IP

#!/bin/bash
# 监控收集脚本

# 请修改成自已实际机器的IP
IP=127.0.0.1

MYNAME=$1

if [ "$MYNAME" = "" ]; then
  echo "缺少1个参数,请加关键字"
  echo "参数为进程匹配grep -E 关键字。使用方法如下: "
  echo "    monitor-collect.sh influxd"
  echo "    monitor-collect.sh td-agent mysql  表示匹配 td-agent mysql"
  echo "    monitor-collect.sh -p 123,456      表示只读123,456这2个进程"
  exit
fi

if [ "$1" = "-p" ]; then
  if [ "$2" = "" ]; then
    echo "-p 参数后必须指定进程ID,多个用,隔开"
    exit
  fi
fi

PID1=0
PID2=0
PID3=0
HAVE_PID1=""
HAVE_PID2=""
HAVE_PID3=""
RUN=true

function TaskClean()
{
  echo "Now do task clean..."
  RUN=false
}

function TaskExit()
{
  if [ $PID1 > 0 ]; then
    echo "Now exit pidstat1: $PID1"
    kill $PID1
    echo "done."
  fi
  if [ $PID2 > 0 ]; then
    echo "Now exit pidstat2: $PID2"
    kill $PID2
    echo "done."
  fi
  if [ $PID3 > 0 ]; then
    echo "Now exit pidstat3: $PID3"
    kill $PID3
    echo "done."
  fi
}

trap 'TaskClean; exit' SIGINT
trap 'TaskExit; exit' EXIT

echo "collect script pid: $$";

LANG=en_US.UTF-8
while $RUN; do
  if [ "$1" = "-p" ]; then
    PIDS=$2
  else
    PIDS=$(ps -eo pid,ppid,user,pcpu,command | grep -v '/bin/sh -c' | grep -v 'monitor-collect.sh' | grep -E "$MYNAME" | grep -v 'grep' | gawk '{print $1}')
    PIDS=$(echo $PIDS | sed -e s'/ /,/g')
  fi

  if [ "$PIDS" != "" ]; then
    echo "found pids: $PIDS"

    # CPU
    if [ "$HAVE_PID1" = "" ]; then
      pidstat -p $PIDS 1 > /dev/tcp/$IP/5170 &
      PID1=$!
      echo "pidstat pid: $PID1"
    fi

    # 内存
    if [ "$HAVE_PID2" = "" ]; then
      pidstat -r -p $PIDS 1 > /dev/tcp/$IP/5171 &
      PID2=$!
      echo "pidstat pid: $PID2"
    fi

    # IO
    if [ "$HAVE_PID3" = "" ]; then
      pidstat -d -p $PIDS 1 > /dev/tcp/$IP/5172 &
      PID3=$!
      echo "pidstat pid: $PID3"
    fi

    # 检查进程是否退出
    while true; do
      HAVE_PID1=$(ps $PID1 | awk '{ print $1 }' | grep $PID1)
      HAVE_PID2=$(ps $PID2 | awk '{ print $1 }' | grep $PID2)
      HAVE_PID3=$(ps $PID3 | awk '{ print $1 }' | grep $PID3)

      if [ "$HAVE_PID1" = "" ]; then
        echo "pidstat1 is exit. now restart"
        break
      fi

      if [ "$HAVE_PID2" = "" ]; then
        echo "pidstat2 is exit. now restart"
        break
      fi

      if [ "$HAVE_PID3" = "" ]; then
        echo "pidstat3 is exit. now restart"
        break
      fi
      sleep 3
    done
  fi
  sleep 1
done

保存好后,记得增加可执行权限:

chmod +x /usr/local/bin/monitor-collect.sh

这个脚本的用法很简单,比如你现在需要收集 mysql 的数据,只要运行如下代码即可:

monitor-collect.sh mysql

如果需要放在后台运行,则这样运行

nohup monitor-collect.sh mysql &

如果要同时监控 mysql,php-fpm,mongodb 的,则可以这样

nohup monitor-collect.sh "mysql|php-fpm|mongod" &

也可以监听指定进程id:

nohup monitor-collect.sh -p 123,456 &

表示监听123 和 456 两个进程。

需要停掉收集程序,直接

ps -ef | grep monitor-collect.sh

得到进程id后直接kill即可。

【确保数据是否都推送到了 InfluxDB 里了】

可以在网页版里,也可以在 influx 终端里执行

use grafana
show measurements

类似出现下面的内容则表示收集来了对应的进程数据

name: measurements
------------------
name
elasticsearch
influxd
mongod

select * from mongod where time > now() - 10s

查看最新10秒的数据,如果有的话则推送过来了,如果没有,请查看 fluntd 的log。

【Grafana 添加监控项目】

Grafana这个工具对于新手来说还是很难用的(至少我一开始不知道怎么下手),所以这边就简单的普及下吧。

先添加一个Dashboard,然后按 ctrl + h 键可以调出/隐藏绿色的控制按钮,点那个绿色的小条条(这设计的真是无语)会出来一个菜单,然后选 Add Panel-> Graph,会出来一个对话框,General 就是常规设置,这个名字就好了。
Metrics里添加一个查询,From选择对应的库,比如 mysql(这里它会自动把有的列出来,很人性化),添加一个WHERE,选择client值是对应服务器IP
SLECEC 选择一个字段(比如cpu)
GROUP BY 选择按1s配置,如下图

QQ20160407-2@2x

如果是CPU项目,则在 Axes & Grid 标签里,Left Y 的 Unit 选择 percent(0-100)
在 Display Styles 标签里,Chart Options 参数可以选择 Bars,如下图:
QQ20160407-3@2x

修改好后,点右边的 “Back to dashboard”,我一开始都不知道这里点,晕呀。下次要修改,只要点击标题,就有菜单可以修改了。

所有修改完毕后,记得点上面的保存图标。

Comments are closed.