有图有真相,先来看几张成果图:
我先介绍下这些东西都是做什么的
- Grafana 是一个非常漂亮的可以自定义的监控web服务,如上图
- InfluxDB 是一个高效的时间数据库,擅长用于记录按时间推进的数据,这里的数据都存在它里面
- Fluentd 是一个数据(日志)接受和分发的服务,可以指定任意输入源后经过适当的处理后再分发到其它的接收端,这里收集数据都是通过它进入到 InfluxDB 的
- pidstat 直接安装 sysstat 就有了,可以查看进程相关数据
【安装】
这里不赘述,只简单的说下(其实都很简单)
- Grafana 参考 http://docs.grafana.org/installation/rpm/
- InfluxDB 参考 https://influxdata.com/downloads/#influxdb
- Fluentd 参考 http://www.fluentd.org/download
这里简单说下,如果是centos7,可直接点击 64-bit (RHEL7, RHEL6, RHEL5) 的 RHEL7 里进入,然后有很多RPM列表,复制最新的URL,然后yum install http://packages.treasuredata.com.s3.amazonaws.com/2/redhat/7/x86_64/td-agent-2.3.1-0.el7.x86_64.rpm这样就可以了
- pidstat 只要
yum install sysstat
后就有了
确定以上程序都安装好后,接下来就是进行配置了,Grafana 和 InfluxDB 实际上也没什么要配置的,装好后直接启动吧,启动命令
systemctl start grafana-server
开启开机自动启动:
systemctl enable grafana-server.service
【InfluxDB 设置】
启动后,就可以通过 http://serverip:8083/ 访问在线的InfluxDB管理功能了。
在Query栏里输入:
CREATE DATABASE grafana WITH DURATION 2d REPLICATION 5
第一行是创建一个 grafana 的帐号和密码
第二行是创建一个 grafana 库,并且默认保留数据2d是2天(m表示月)的意思,可以自行设置,REPLICATION 5 是分5个片,如果想永久保留把 DURATION 2d 去掉就可以了,这些后期都可以修改
【Grafana 设置】
http://serverip:3000/ 是 Grafana 的管理界面。
进入 Grafana 页面(默认帐号是 admin/admin)点击 Data Sources 连接,在上面点击 Add new 连接,type选择 InfluxDB0.9.x(虽然我装的是0.12但是只有这个,选这个可以用)URL 填 http://serverip:8086/ 这个 8086 端口就是 influxdb 的默认http接口端口。注意,Access 要选择 proxy。
下面的 InfluxDB Details 全部填 grafana 就好了。
【Fluentd 设置】
InfluxDB 的源添加好了,现在开始收集数据吧。收集数据我用到了 fluentd 进行分发。默认装好后就有
命令了,此时,需要安装2个插件:
td-agent-gem install fluent-plugin-rewrite-tag-filter
安装成功后,将下面我修改的influx插件文件放在
require 'date'
require 'influxdb'
class Fluent::InfluxOutput < Fluent::BufferedOutput
Fluent::Plugin.register_output('influx', self)
include Fluent::HandleTagNameMixin
config_param :host, :string, :default => 'localhost',
:desc => "The IP or domain of influxDB."
config_param :port, :integer, :default => 8086,
:desc => "The HTTP port of influxDB."
config_param :dbname, :string, :default => 'fluentd',
:desc => <<-DESC
The database name of influxDB.
You should create the database and grant permissions at first.
DESC
config_param :user, :string, :default => 'root',
:desc => "The DB user of influxDB, should be created manually."
config_param :password, :string, :default => 'root', :secret => true,
:desc => "The password of the user."
config_param :time_precision, :string, :default => 's',
:desc => <<-DESC
The time precision of timestamp.
You should specify either hour (h), minutes (m), second (s),
millisecond (ms), microsecond (u), or nanosecond (n).
DESC
config_param :use_ssl, :bool, :default => false,
:desc => "Use SSL when connecting to influxDB."
config_param :tag_keys, :array, :default => [],
:desc => "The names of the keys to use as influxDB tags."
config_param :sequence_tag, :string, :default => nil,
:desc => <<-DESC
The name of the tag whose value is incremented for the consecutive simultaneous
events and reset to zero for a new event with the different timestamp.
DESC
config_param :uniqu, :bool, :default => true
def initialize
super
@seq = 0
end
def configure(conf)
super
@uniqu = conf['uniqu']
@influxdb = InfluxDB::Client.new @dbname, host: @host,
port: @port,
username: @user,
password: @password,
async: false,
time_precision: "n",
# time_precision: @time_precision,
use_ssl: @use_ssl
end
def start
super
end
FORMATTED_RESULT_FOR_INVALID_RECORD = ''.freeze
def format(tag, time, record)
# TODO: Use tag based chunk separation for more reliability
if record.empty? || record.has_value?(nil)
FORMATTED_RESULT_FOR_INVALID_RECORD
else
[tag, time, record].to_msgpack
end
end
def shutdown
super
end
def write(chunk)
points = []
# 自动调回0
if @seq > 1000000
@seq = 0
end
chunk.msgpack_each do |tag, time, record|
timestamp = record.delete('time') || time
# 使用参数里的毫秒
if record["microtime"] && record["microtime"] > timestamp
timestamp = record["microtime"]
end
if @uniqu
timestamp = (timestamp * 1000000000).to_i
else
@seq += 1
# 这里强制 time_precision 参数设定成 n, 所以乘以1000000000
timestamp = (timestamp * 1000000000).to_i + @seq
end
if tag_keys.empty?
values = record
tags = {}
else
values = {}
tags = {}
record.each_pair do |k, v|
if @tag_keys.include?(k)
tags[k] = v
else
values[k] = v
end
end
end
point = {
:timestamp => timestamp,
:series => tag,
:values => values,
:tags => tags,
}
points << point
end
@influxdb.write_points(points)
end
end
插件保存后,添加一个 td-agent(fluentd)的配置,将以下文件放在
文件中
<source>
type tcp
port 5170
tag collect.log
source_host_key client
types uid:integer,pid:integer,usr:float,system:float,guest:float,cpu:float,cpu_num:integer
format /^(?<time>[^ ]+ [P|A]M)[ ]+(?<uid>\d+)?[ ]*(?<pid>\d+)[ ]+(?<usr>[^ ]+)[ ]+(?<system>[^ ]+)[ ]+(?<guest>[^ ]*)[ ]+(?<cpu>[^ ]*)[ ]+(?<cpu_num>[\d]+)[ ]+(?<command>.*)$/
</source>
# 内存
<source>
type tcp
port 5171
tag collect.log
source_host_key client
types uid:integer,pid:integer,minflt:float,majflt:float,vsz:integer,rss:integer,mem:float
format /^(?<time>[^ ]+ [P|A]M)[ ]+(?<uid>\d+)?[ ]*(?<pid>\d+)[ ]+(?<minflt>[^ ]+)[ ]+(?<majflt>[^ ]+)[ ]+(?<vsz>[^ ]+)[ ]+(?<rss>[^ ]+)[ ]+(?<mem>[^ ]+)[ ]+(?<command>.*)$/
</source>
# IO
<source>
type tcp
port 5172
tag collect.log
source_host_key client
types uid:integer,pid:integer,kprd:float,kbwr:float,kbccwr:float
format /^(?<time>[^ ]+ [P|A]M)[ ]+(?<uid>\d+)?[ ]*(?<pid>\d+)[ ]+(?<kprd>[^ ]+)[ ]+(?<kbwr>[^ ]+)[ ]+(?<kbccwr>[^ ]+)[ ]+(?<command>.*)$/
</source>
<match collect.log>
type rewrite_tag_filter
rewriterule1 command ^influxd$ monitor.influxd
rewriterule2 command ^java$ monitor.elasticsearch
rewriterule3 command ^td-agent$ monitor.td-agent
rewriterule4 command ^td-agent-(.*)$ monitor.td-agent-$1
rewriterule5 command ^([^.]+)$ monitor.$1
</match>
<match monitor.*>
type influx
host 10.1.37.3
port 8086
dbname grafana
user influxdb
password influxdb
remove_tag_prefix monitor.
flush_interval 1
tag_keys ["pid", "parent_pid", "command", "client"]
uniqu false
retry_wait 3
buffer_type memory
buffer_queue_limit 10
buffer_chunk_limit 1m
retry_limit 5
</match>
然后使用
启动即可,如果需要放后台启动可以加 -d 参数,如果需要记录log,则可以加参数 -o logpath.log 完整的如下:
它监听了3个tcp端口,分别用来收集cpu,内存,io的。
看看是否成功启动了
【收集指定程序的数据】
成功启动后,接下来就是去需要收集进程的其它服务器上操作了,这边需要用到我写的另外一个bash文件,文件保存在
文件里(如果是别的文件名,则请修改下代码里的 “monitor-collect.sh” 为同名文件),另外,注意修改第4行的IP为 Fluentd 运行的服务器IP
# 监控收集脚本
# 请修改成自已实际机器的IP
IP=127.0.0.1
MYNAME=$1
if [ "$MYNAME" = "" ]; then
echo "缺少1个参数,请加关键字"
echo "参数为进程匹配grep -E 关键字。使用方法如下: "
echo " monitor-collect.sh influxd"
echo " monitor-collect.sh td-agent mysql 表示匹配 td-agent mysql"
echo " monitor-collect.sh -p 123,456 表示只读123,456这2个进程"
exit
fi
if [ "$1" = "-p" ]; then
if [ "$2" = "" ]; then
echo "-p 参数后必须指定进程ID,多个用,隔开"
exit
fi
fi
PID1=0
PID2=0
PID3=0
HAVE_PID1=""
HAVE_PID2=""
HAVE_PID3=""
RUN=true
function TaskClean()
{
echo "Now do task clean..."
RUN=false
}
function TaskExit()
{
if [ $PID1 > 0 ]; then
echo "Now exit pidstat1: $PID1"
kill $PID1
echo "done."
fi
if [ $PID2 > 0 ]; then
echo "Now exit pidstat2: $PID2"
kill $PID2
echo "done."
fi
if [ $PID3 > 0 ]; then
echo "Now exit pidstat3: $PID3"
kill $PID3
echo "done."
fi
}
trap 'TaskClean; exit' SIGINT
trap 'TaskExit; exit' EXIT
echo "collect script pid: $$";
LANG=en_US.UTF-8
while $RUN; do
if [ "$1" = "-p" ]; then
PIDS=$2
else
PIDS=$(ps -eo pid,ppid,user,pcpu,command | grep -v '/bin/sh -c' | grep -v 'monitor-collect.sh' | grep -E "$MYNAME" | grep -v 'grep' | gawk '{print $1}')
PIDS=$(echo $PIDS | sed -e s'/ /,/g')
fi
if [ "$PIDS" != "" ]; then
echo "found pids: $PIDS"
# CPU
if [ "$HAVE_PID1" = "" ]; then
pidstat -p $PIDS 1 > /dev/tcp/$IP/5170 &
PID1=$!
echo "pidstat pid: $PID1"
fi
# 内存
if [ "$HAVE_PID2" = "" ]; then
pidstat -r -p $PIDS 1 > /dev/tcp/$IP/5171 &
PID2=$!
echo "pidstat pid: $PID2"
fi
# IO
if [ "$HAVE_PID3" = "" ]; then
pidstat -d -p $PIDS 1 > /dev/tcp/$IP/5172 &
PID3=$!
echo "pidstat pid: $PID3"
fi
# 检查进程是否退出
while true; do
HAVE_PID1=$(ps $PID1 | awk '{ print $1 }' | grep $PID1)
HAVE_PID2=$(ps $PID2 | awk '{ print $1 }' | grep $PID2)
HAVE_PID3=$(ps $PID3 | awk '{ print $1 }' | grep $PID3)
if [ "$HAVE_PID1" = "" ]; then
echo "pidstat1 is exit. now restart"
break
fi
if [ "$HAVE_PID2" = "" ]; then
echo "pidstat2 is exit. now restart"
break
fi
if [ "$HAVE_PID3" = "" ]; then
echo "pidstat3 is exit. now restart"
break
fi
sleep 3
done
fi
sleep 1
done
保存好后,记得增加可执行权限:
这个脚本的用法很简单,比如你现在需要收集 mysql 的数据,只要运行如下代码即可:
如果需要放在后台运行,则这样运行
如果要同时监控 mysql,php-fpm,mongodb 的,则可以这样
也可以监听指定进程id:
表示监听123 和 456 两个进程。
需要停掉收集程序,直接
得到进程id后直接kill即可。
【确保数据是否都推送到了 InfluxDB 里了】
可以在网页版里,也可以在 influx 终端里执行
类似出现下面的内容则表示收集来了对应的进程数据
------------------
name
elasticsearch
influxd
mongod
再
查看最新10秒的数据,如果有的话则推送过来了,如果没有,请查看 fluntd 的log。
【Grafana 添加监控项目】
Grafana这个工具对于新手来说还是很难用的(至少我一开始不知道怎么下手),所以这边就简单的普及下吧。
先添加一个Dashboard,然后按 ctrl + h 键可以调出/隐藏绿色的控制按钮,点那个绿色的小条条(这设计的真是无语)会出来一个菜单,然后选 Add Panel-> Graph,会出来一个对话框,General 就是常规设置,这个名字就好了。
Metrics里添加一个查询,From选择对应的库,比如 mysql(这里它会自动把有的列出来,很人性化),添加一个WHERE,选择client值是对应服务器IP
SLECEC 选择一个字段(比如cpu)
GROUP BY 选择按1s配置,如下图
如果是CPU项目,则在 Axes & Grid 标签里,Left Y 的 Unit 选择 percent(0-100)
在 Display Styles 标签里,Chart Options 参数可以选择 Bars,如下图:
修改好后,点右边的 “Back to dashboard”,我一开始都不知道这里点,晕呀。下次要修改,只要点击标题,就有菜单可以修改了。
所有修改完毕后,记得点上面的保存图标。