智能运维的数据清洗:去除重复记录

数据清洗的过程包括处理缺失值、去除重复记录、纠正数据错误和标准化数据格式。有效的数据清洗能够提高数据的准确性和一致性,为后续的数据分析和建模奠定坚实的基础。本文讲解去除重复记录的方法。
重复记录是指数据集中存在的多条相同的记录。去除重复记录可以减少数据冗余,保证数据的唯一性。通常通过识别唯一标识符来进行去重处理。在运维工作中,经常需要分析服务器访问日志。示例6-3首先生成模拟的Web服务器访问日志数据,然后进行IP去重,示例文件为demo/code/chapter6/drop_duplicates.py。
【示例6-3】去除重复记录
import re
import random
from collections import defaultdict
from datetime import datetime, timedelta
 
def generate_fake_ips(num_ips=50, num_requests=1000):
    """
    生成模拟IP地址和访问日志
    :param num_ips: 要生成的独立IP数量
    :param num_requests: 要生成的总请求数
    :return: (IP列表, 日志条目列表)
    """
    # 生成基础IP池
    base_ips = [f"192.168.{random.randint(0, 255)}.{random.randint(1, 254)}"  for _ in range(num_ips)]
 
    # 添加一些公共IP和特殊IP
    special_ips = [
        "127.0.0.1",
        "10.0.0.1",
        "172.16.0.1",
        "8.8.8.8",
        "1.1.1.1"
    ]
    ip_pool = base_ips + special_ips
 
    # 生成请求时间范围(最近30天)
    end_time = datetime.now()
    start_time = end_time - timedelta(days=30)
 
    # 生成日志条目
    log_entries = []
    ips = []
    for _ in range(num_requests):
        ip = random.choice(ip_pool)
        ips.append(ip)
 
        # 生成随机时间戳
        time_diff = end_time - start_time
        random_seconds = random.randint(0, int(time_diff.total_seconds()))
        timestamp = start_time + timedelta(seconds=random_seconds)
 
        # 生成随机HTTP方法、路径和状态码
        methods = ["GET", "POST", "PUT", "DELETE", "HEAD"]
        paths = ["/", "/index.html", "/api/data", "/images/logo.png", "/static/style.css"]
        status_codes = [200, 301, 404, 500, 302]
 
        log_entry = (
            f"{ip} - - [{timestamp.strftime('%d/%b/%Y:%H:%M:%S +0000')}] "
            f"\"{random.choice(methods)} {random.choice(paths)} HTTP/1.1\" "
            f"{random.choice(status_codes)} {random.randint(100, 5000)}"
        )
        log_entries.append(log_entry)
 
    return ips, log_entries
 
def save_log_to_file(log_entries, filename):
    """
    将日志保存到文件
    :param log_entries: 日志条目列表
    :param filename: 要保存的文件名
    """
    with open(filename, 'w') as f:
        f.write("\n".join(log_entries))
    print(f"已生成模拟日志文件: {filename} (共 {len(log_entries)} 条记录)")
 
def extract_ips_from_log(log_file):
    """
    从日志文件中提取IP地址
    :param log_file: 日志文件路径
    :return: IP地址列表
    """
    ip_pattern = r'\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3}'
    ips = []
 
    try:
        with open(log_file, 'r') as f:
            for line in f:
                match = re.search(ip_pattern, line)
                if match:
                    ips.append(match.group())
    except FileNotFoundError:
        print(f"错误:文件 {log_file} 未找到")
        return []
 
    return ips
 
def remove_duplicates(ip_list):
    """
    去除重复IP并统计出现次数
    :param ip_list: 包含重复IP的列表
    :return: (唯一IP列表, {IP: 出现次数}字典)
    """
    ip_count = defaultdict(int)
    for ip in ip_list:
        ip_count[ip] += 1
 
    unique_ips = list(ip_count.keys())
    return unique_ips, ip_count
 
def analyze_ips(log_file, output_file=None):
    """
    分析日志文件中的IP地址
    :param log_file: 输入日志文件
    :param output_file: 结果输出文件(可选)
    """
    print(f"正在分析日志文件: {log_file}")
    ips = extract_ips_from_log(log_file)
 
    if not ips:
        print("未提取到任何IP地址")
        return
 
    unique_ips, ip_count = remove_duplicates(ips)
 
    print(f"\n分析结果:")
    print(f"总请求数: {len(ips)}")
    print(f"独立IP数量: {len(unique_ips)}")
    print("\n出现次数最多的10个IP:")
    sorted_ips = sorted(ip_count.items(), key=lambda x: x[1], reverse=True)
    for ip, count in sorted_ips[:10]:
        print(f"{ip}: {count}次")
 
    if output_file:
        with open(output_file, 'w') as f:
            f.write("独立IP列表:\n")
            f.write("\n".join(sorted(unique_ips)))
        print(f"\n结果已保存到: {output_file}")
 
def main():
    # 生成模拟数据
    print("正在生成模拟日志数据...")
    ips, log_entries = generate_fake_ips(num_ips=50, num_requests=2000)
    log_file = "data/simulated_access.log"
    save_log_to_file(log_entries, log_file)
 
    # 分析生成的日志文件
    output_file = "data/unique_ips.txt"
    analyze_ips(log_file, output_file)
 
    # 显示一些统计信息
    print("\n模拟数据统计:")
    print(f"生成的独立IP数量: {len(set(ips))}")
    print(f"生成的重复IP率: {(1 - len(set(ips))/len(ips))*100:.2f}%")
 
f __name__ == "__main__":
    main()
代码解释:


(1)generate_fake_ips函数:生成指定数量的随机IP地址(包括一些特殊IP),为每个请求生成合理的日志条目,包含:
随机IP地址。
随机时间戳(分布在最近30天内)。
随机HTTP方法、路径和状态码。
随机响应大小。
(2)save_log_to_file函数:将生成的模拟日志保存到文件,格式类似于常见的Nginx/Apache访问日志。
(3)extract_ips_from_log函数:使用正则表达式从每行日志中提取IP地址,返回包含所有IP地址的列表(可能有重复)。
(4)remove_duplicates函数:使用defaultdict统计每个IP的出现次数,返回唯一IP列表和IP计数字典。
(5)analyze_ips函数:整合整个分析流程,输出统计信息和前10个最活跃IP,可以选择将结果保存到文件中。
示例6-3的运行结果如下:
正在生成模拟日志数据...
已生成模拟日志文件: data/simulated_access.log (共 2000 条记录)
正在分析日志文件: data/simulated_access.log
 
分析结果:
总请求数: 2000
独立IP数量: 55
 
出现次数最多的10个IP:
1.1.1.1: 50次
192.168.191.56: 48次
192.168.134.164: 47次
192.168.127.235: 47次
192.168.108.168: 47次
172.16.0.1: 45次
192.168.95.230: 43次
192.168.109.128: 43次
192.168.167.214: 42次
192.168.176.70: 42次
 
结果已保存到: data/unique_ips.txt
 
模拟数据统计:
生成的独立IP数量: 55
生成的重复IP率: 97.25%

本文节选自《智能运维实践》一书,获出版社和作者授权发布,仅供读者个人学习使用。

请使用浏览器的分享功能分享到微信等