Published on

日志分析: Pino + AWS Glue + AWS S3 + Athena

Authors
  • avatar
    Name
    Shelton Ma
    Twitter

使用 pino 记录日志并将其写入到 Amazon S3 后通过 Athena 进行分析,可以按照以下步骤进行:

1. 配置 Pino 日志格式

Athena 最适合分析结构化日志,因此推荐将日志输出为 JSON 格式,方便在 S3 上存储并解析.

import pino from 'pino';

const logger = pino({
  level: 'info',
  formatters: {
    level(label) {
      return { level: label };
    }
  },
  timestamp: pino.stdTimeFunctions.isoTime // ISO 时间格式,方便 Athena 解析
});

2. 将日志写入 S3

1. 使用 Amazon CloudWatch Logs + S3 Export

  1. 安装 sudo yum install -y amazon-cloudwatch-agent

  2. 配置 CloudWatch Logs Agent

    {
      "logs": {
        "logs_collected": {
          "files": {
            "collect_list": [
              {
                "file_path": "/var/log/myapp.log",
                "log_group_name": "my-app-logs",
                "log_stream_name": "{instance_id}"
              }
            ]
          }
        }
      }
    }
    
  3. 启动Agent

    sudo amazon-cloudwatch-agent-ctl -a fetch-config -m ec2 -c file:/opt/aws/amazon-cloudwatch-agent.json -s
    
  4. 定期导出 CloudWatch Logs 到 S3

2. 使用 AWS SDK 将日志文件上传/或者使用 kafka

import fs from 'fs';
import { S3Client, PutObjectCommand } from '@aws-sdk/client-s3';
import pino from 'pino';

const s3Client = new S3Client({ region: 'us-east-1' });
const logger = pino(pino.destination('./logs/app-log.json'));

// 定时上传日志至 S3
async function uploadLogsToS3() {
  const logData = fs.readFileSync('./logs/app-log.json', 'utf8');

  const params = {
    Bucket: 'my-app-logs',
    Key: `logs/${new Date().toISOString().split('T')[0]}/app-log-${Date.now()}.json`,
    Body: logData,
    ContentType: 'application/json'
  };

  await s3Client.send(new PutObjectCommand(params));

  // 清空本地日志文件
  fs.writeFileSync('./logs/app-log.json', '');
}

// 每隔5分钟上传一次
setInterval(uploadLogsToS3, 300000);

// 示例日志
logger.info({ event: 'user_login', user: 'john_doe', status: 'success' });
logger.error({ event: 'db_error', error: 'Connection timeout' });

3. Athena/Glue配置及分析

  1. 使用 AWS Glue 数据目录创建表

  2. 查询示例

    SELECT timestamp, user, status
    FROM logs
    WHERE event = 'user_login'
      AND status = 'failed'
      AND from_iso8601_timestamp(timestamp) >= current_timestamp - interval '7' day;
    
    SELECT timestamp, error
    FROM logs
    WHERE level = 'error'
    ORDER BY timestamp DESC;
    
  3. 将athena数据源接入可视化系统

4. 优化方案

  1. 分区存储:按项目/日期将日志分区存储,提升 Athena 查询效率.
  2. 压缩格式:推荐使用 GZIP、Parquet 或 ORC 格式,进一步提升查询性能并降低存储成本.
  3. 日志归档策略:可通过 AWS S3 Lifecycle 规则将旧日志转移至 Glacier 降低存储成本.