redis提供了一種 HyperLog 類型,主要用於大數據(極限 2 ^ 64)但準確度要求不是很苛刻的計量統計或去重判斷(精度誤差 0.81%),處理速度超快(每秒數萬次),並且最多隻佔用12K + 8字節內存空間。

官方文檔:

https://redis.io/docs/latest/develop/data-types/probabilistic/hyperloglogs/


HyperLog 基本原理(編碼作為字符串,可以按字符串讀寫)。

源代碼位置:src/hyperloglog.c


分成兩部分(每字節8位)
  1. hash換算無符號整數8字節(64位,14位用於對應16K寄存器,50位用來計數)。
  2. 存儲12K字節的寄存器(寄存器是6位長度,即按6位算是16K個寄存器)。

注:14位即 2^14 = 16K = 16*1024 。

注:12K字節 = 12 * 1024 * 8 = 16K * 6 = 16 * 1024 * 6


概率數據結構
  • 通過hash換算長度為 2^64 位以內的字符串為一個64位無符號整數,低14位是寄存器編號,高50位統計其二進制數尾部連續0個數。
  • 通過寄存器編碼提取6位的寄存器,將0個數值條件寫入(當寄存器中的值 >= 則認為已經存在跳過,反之認為不存在寫入)。
  • 把寄存器中的值(2 ^ 6 位)以索引方式填充到 int[64] 數組中,命中的索引值對應的數組值 +1
  • 隨機概率估算得出基數


redis處理HyperLog分兩種模式
  • 稀疏結構,用於計量小於寄存器佔用字節數據 < hll-sparse-max-bytes(redis.conf 中配置,默認: 3000字節,大概3K空間) 的基數估算。稀疏結構高效,佔用空間動態擴大,但建議 hll-sparse-max-bytes 限制在3000字節以內。
  • 稠密結構,固定大小 12K 字節空間。


核心思路

數據具有隨機性,通過固定特徵(hash值尾部0的個數最大值)分佈到 16K 寄存器中(減少特徵稀釋),再通過隨機概率算法彙總估算即可得到相近值。


Nginx

nginx日誌格式需要調整,打開nginx.conf配置文件。

增加以下配置。

# 修改日誌格式,主要是增加 $bytes_sent 記錄流量字節
log_format  main '$remote_addr - $remote_user [$time_local] "$request" "$host" $status $request_length $bytes_sent "$http_referer" "$http_user_agent" "$http_x_forwarded_for"';

# 在server配置塊中增加以下配置
access_log logs/$host-access.log main;


PHP

先安裝redis客户端擴展

下載擴展源碼包 https://pecl.php.net/package/redis

# 示例安裝

# php安裝目錄
INSTALL_PATH=/usr/local/php

wget -qO - --no-check-certificate "https://pecl.php.net/get/redis-6.3.0.tgz"
yum install -y autoconf unzip
unzip redis-6.3.0.tgz

cd redis-6.3.0
# 生成 configure 編譯配置腳本
$INSTALL_PATH/bin/phpize

./configure --with-php-config=$INSTALL_PATH/bin/php-config

echo "extension=redis.so" >> $INSTALL_PATH/lib/php.ini



php統計代碼

保存位置: /www/www-resource/nginx-log.php

<?php

/**
 * nginx-log統計腳本
 */
class NgnixLog {

    /**
     * 緩存前綴
     */
    private const CACHE_PREFIX = 'nginx-log-';

    /**
     * @var \Redis
     */
    protected \Redis $redis;

    /**
     * redis連接地址
     * @var string
     */
    protected string $redisHost;

    /**
     * redis連接端口
     * @var int
     */
    protected int $redisPort;

    /**
     * 頁面請求後綴
     * @var string
     */
    protected string $pageExt;

    /**
     * nginx log 格式
     * @var string
     */
    protected string $format = '$remote_addr - $remote_user [$time_local] "$request" "$host" $status $request_length $bytes_sent "$http_referer" "$http_user_agent" "$http_x_forwarded_for"';

    /**
     * 日誌文件位置記錄
     * @var array
     */
    protected array $logpos = [];

    /**
     * nginx log格式正則表達式
     * @var string
     */
    protected string $formatReg;

    /**
     * nginx log格式對應變量名
     * @var array
     */
    protected array $formatKeys;

    /**
     * nginx運行文件路徑
     * @var string
     */
    protected string $nginxBin = 'nginx';

    /**
     * 統計時間維度
     * @var array
     */
    protected array $dateFormats = ['Y-m-d'];

    /**
     * 每次收集日誌行數
     * @var int
     */
    protected int $eachLogLine = 500;

    /**
     * 初始化處理
     * @param string $format
     * @param string $redisHost
     * @param int $redisPort
     * @param string $pageExt
     */
    public function __construct(string $format = null, string $redisHost = '127.0.0.1', int $redisPort = 6379, string $pageExt = 'html|htm|php') {
        if ($format) {
            $this->format = $format;
        }
        if (!class_exists(\Redis::class)) {
            die('請安裝redis擴展!');
        }
        $this->redisHost = $redisHost;
        $this->redisPort = $redisPort;
        $this->pageExt = $pageExt;
        $arr = preg_split('#\$\w+#', $this->format);
        if (count($arr) < 2) {
            die('日誌格式沒有變量佔位符:' . $this->format);
        }
        $rule = '';
        preg_match_all('#\$(\w+)#', $this->format, $matches);
        $keys = $matches[1];
        foreach ($keys as $key => $_) {
            $rule .= preg_quote($arr[$key], '#') . '(.*?)';
        }
        $this->formatReg = '#^' . $rule . '$#s';
        $this->formatKeys = $keys;
    }

    /**
     * 設置nginx運行路徑
     * @param string $bin
     */
    public function setNginxBin(string $bin) {
        if (file_exists($bin) && is_executable($bin)) {
            $this->nginxBin = $bin;
        } else {
            die('請指定可執行的nginx運行路徑');
        }
    }

    /**
     * 設置統計的時間格式
     * @param string $formats
     */
    public function setDateFormats(string ...$formats) {
        $this->dateFormats = $formats;
    }

    /**
     * 設置每次收集日誌行數
     * @param int $size
     */
    public function setEachLogLine(int $size) {
        $this->eachLogLine = max($size, 1);
    }

    /**
     * 運行日誌處理
     * @param string $logDir
     * @param string $pattern
     * @param int $maxSize
     * @throws Exception
     */
    public function run(string $logDir, string $pattern = './*-access.log', int $maxSize = 10 ** 7) {
        $this->newRedis();
        if (!is_dir($logDir)) {
            die('日誌目錄不存在!');
        }
        chdir($logDir);
        while (true) {
            foreach ($this->eachFile($pattern) as $file) {
                echo '提取日誌文件:' . $file . PHP_EOL;
                foreach ($this->eachFlow($file) as $data) {
                    $this->sync($data);
                }
                if ($maxSize > 0 && $this->logpos[basename($file)] >= $maxSize) { // 超過指定記錄數壓縮處理
                    $this->compress($file);
                }
            }
            sleep(20);
        }
    }

    /**
     * 壓縮日誌文件
     * @param string $logfile
     */
    protected function compress(string $logfile) {
        echo '開始壓縮處理' . PHP_EOL;
        $zipDir = './zip-log';
        if (!is_dir($zipDir)) {
            mkdir($zipDir);
        }
        $name = basename($logfile);
        $tmpfile = './zip-log/' . $name;
        if (file_exists($tmpfile)) {
            unlink($tmpfile);
        }
        rename($logfile, $tmpfile);
        $nginx = basename($this->nginxBin);
        if (strpos($this->nginxBin, '.') !== false) {
            $dir = dirname($this->nginxBin);
            $command = "cd \"{$dir}\" && ./{$nginx} -t && ./{$nginx} -s reload";
        } else {
            $command = "{$nginx} -t && {$nginx} -s reload";
        }
        system($command, $result_code);
        if ($result_code != 0) {
            throw new Exception('無法正常重新加載nginx配置數據');
        }
        foreach ($this->eachFlow($tmpfile) as $data) {
            $this->sync($data);
        }
        // 重置日誌文件位置
        $this->cache($name, 0);
        $this->logpos[$name] = 0;
        $zipFile = './zip-log/' . date('Y-m-d_H-i-s') . '-' . $name . '.zip';
        // 壓縮處理
        system("zip {$zipFile} {$tmpfile}", $result_code);
        if ($result_code != 0) {
            throw new Exception('壓縮日誌文件失敗');
        }
        // 刪除已經壓縮成功的日誌文件
        unlink($tmpfile);
    }

    /**
     * 循環提取指定的文件
     * @param string $pattern
     */
    protected function eachFile(string $pattern = './*-access.log') {
        yield from glob($pattern);
    }

    /**
     * 循環讀取日誌
     * @param string $logfile
     * @param int $pos
     * @throws Exception
     */
    protected function eachRead(string $logfile, int &$pos = null) {
        $f = fopen($logfile, 'r');
        if ($pos) {
            fseek($f, $pos, SEEK_SET);
        }
        while (!feof($f)) {
            $line = fgets($f, 10240);
            if (trim($line) == '') {
                continue;
            }
            if (preg_match($this->formatReg, $line, $matches)) {
                $data = [];
                foreach ($this->formatKeys as $index => $key) {
                    $data[$key] = $matches[$index + 1];
                }
                yield $data;
            }
        }
        $pos = ftell($f);
        fclose($f);
    }

    /**
     * 循環提取流量數據
     * @param string $logfile
     */
    protected function eachFlow(string $logfile) {
        $name = basename($logfile);
        foreach ($this->eachCount($logfile) as $pos => $data) {
            foreach ($data as $date => $items) {
                foreach ($items as $domain => $item) {
                    $item['ip'] = $this->getUniqueCount(["{$date}|{$domain}", ...array_unique($item['ip'])]);
                    $item['uv'] = $this->getUniqueCount(["{$date}|{$domain}", ...array_unique($item['uv'])]);
                    $data[$date][$domain] = $item;
                }
            }
            yield $data;
            $this->cache($name, $pos);
            $this->logpos[$name] = $pos;
        }
    }

    /**
     * 循環統計數據
     * @param string $logfile
     */
    protected function eachCount(string $logfile) {
        $name = basename($logfile);
        if (!isset($this->logpos[$name])) {
            $this->logpos[$name] = intval($this->cache($name) ?: 0);
        }
        $pos = $this->logpos[$name];
        $data = [];
        $num = 0;
        foreach ($this->eachRead($logfile, $pos) as $item) {
            if ($item['remote_addr'] == '127.0.0.1') {
                continue;
            }
            $num++;
            $req_size = $item['request_length']; // 請求大小
            $res_size = $item['bytes_sent']; // 響應大小
            $arr = explode(' ', $item['request']);
            $pv = 0;
            if (isset($arr[1])) {
                $path = parse_url($arr[1], PHP_URL_PATH) ?: '/';
                if (substr($path, -1) == '/' || !preg_match('#\.(\w+)#', $path, $matches) || preg_match('/^(' . $this->pageExt . ')$/', $matches[1])) {
                    $pv = 1;
                }
            }
            $ip = $item['remote_addr'];
            $uv = md5($item['remote_addr'] . $item['http_user_agent']);
            if ($item['status'] >= 400 && $item['status'] < 500) {
                $req_4xx = 1;
                $req_5xx = 0;
            } elseif ($item['status'] >= 500 && $item['status'] < 600) {
                $req_4xx = 0;
                $req_5xx = 1;
            } else {
                $req_4xx = 0;
                $req_5xx = 0;
            }
            $spider = preg_match('#(Baiduspider|Bytespider|360Spider|Sogou web spider|Sosospider|Googlebot|bingbot|AdsBot-Google|Google-Adwords|YoudaoBot|Yandex|DNSPod-Monitor|YisouSpider|mpcrawler)#', $item['http_user_agent']) ? 1 : 0;
            $domain = $item['host'];
            foreach ($this->dateFormats as $dateFormat) {
                $date = date($dateFormat, strtotime($item['time_local']));
                if (empty($data[$date][$domain])) {
                    $data[$date][$domain] = [
                        'req' => 0,
                        'pv' => 0,
                        'ip' => [],
                        'uv' => [],
                        'req_4xx' => 0,
                        'req_5xx' => 0,
                        'spider' => 0, // 蜘蛛
                        'fake_spider' => 0, // 假蜘蛛
                        'req_size' => 0,
                        'res_size' => 0,
                    ];
                }
                $result = &$data[$date][$domain];
                $result['req']++;
                $result['req_size'] += $req_size; // 請求大小
                $result['res_size'] += $res_size; // 響應大小
                $result['pv'] += $pv;
                $result['ip'][] = $ip;
                $result['uv'][] = $uv;
                $result['req_4xx'] += $req_4xx;
                $result['req_5xx'] += $req_5xx;
                $result['spider'] += $spider;
                unset($result);
            }
            if ($num > $this->eachLogLine) {
                yield $pos => $data;
                $data = [];
                $num = 0;
            }
        }
        if (count($data)) {
            yield $pos => $data;
        }
    }

    /**
     * 統計唯一數
     * @staticvar null $sha
     * @param array $data
     * @return int
     */
    protected function getUniqueCount(array $data) {
        static $sha = null;
        if (empty($sha)) {
            $code = "local prevCount = redis.call('pfCount', ARGV[1]);redis.pcall('pfAdd', unpack(ARGV));redis.call('expire', ARGV[1], 86400);return redis.call('pfCount', ARGV[1]) - prevCount;";
            $sha = sha1($code);
            if (!$this->redis->script('exists', $sha)[0]) {
                $this->redis->script('load', $code);
            }
        }
        return $this->redis->evalSha($sha, $data);
    }

    /**
     * 讀寫緩存數據
     * @param string $key
     * @param mixed $value
     * @return mixed
     */
    protected function cache(string $key, $value = null) {
        if (func_num_args() == 1) {
            return $this->redis->get(self::CACHE_PREFIX . $key);
        } else {
            $this->redis->set(self::CACHE_PREFIX . $key, $value);
        }
    }

    /**
     * 生成redis連接處理器
     */
    protected function newRedis() {
        $this->redis = new \Redis([
            'host' => $this->redisHost,
            'port' => $this->redisPort,
            'connectTimeout' => 5,
        ]);
    }

    /**
     * 同步數據
     * @param array $data
     * @return bool
     */
    protected function sync(array $data) {
        $params = [];
        foreach ($data as $date => $items) {
            foreach ($items as $domain => $item) {
                $item['date'] = $date;
                $item['domain'] = $domain;
                $item['flow'] = $item['req_size'] + $item['res_size'];
                unset($item['req_size'], $item['res_size']);
                /*
                *
                * 這裏入庫
                *
                *
                *
                */
            }
        }
    }
}

try {
    $NgnixLog = new NgnixLog();
    $nginxDir = glob('/usr/local/nginx/*', GLOB_ONLYDIR)[0];
    $NgnixLog->setNginxBin($nginxDir . '/sbin/nginx');
    $NgnixLog->run($nginxDir . '/logs/', $argv[1] ?? './*-access.log', intval($argv[2] ?? 10 ** 7));
} catch (Exception $err) {
    die($err->getMessage());
}


啓動腳本

腳本可以添加到定時器中,保證腳本正常運行。

#!/bin/bash

# 安裝目錄
PHP_BIN='/usr/local/php/bin/php'
NGINX_LOG_PATH='/usr/local/nginx/logs/'

run_script(){
    PS_LINE=$(ps aux|grep php)

    if echo $PS_LINE|grep 'nginx-log.php'|grep -q "$1";then
        echo '已經運行'
    else
        nohup $PHP_BIN /www/www-resource/nginx-log.php "$1"> /www/www-resource/nginx-log.log &
    fi
}

# 遍歷出所有日誌文件,使用單獨的進程運行
while read -r FILE_PATH;do
    run_script "$FILE_PATH"
done <<EOF
$(cd $NGINX_LOG_PATH;find ./ -maxdepth 1 -name '*-access.log')
EOF