Administrator
发布于 2026-01-06 / 5 阅读
0
0

IO多路复用-poll

C++语言完整实现

#include <iostream>
#include <vector>
#include <string>
#include <cstring>
#include <unistd.h>
#include <sys/socket.h>
#include <netinet/in.h>
#include <arpa/inet.h>
#include <poll.h>
#include <fcntl.h>
#include <errno.h>

class PollServer {
private:
    int server_fd;
    int port;
    std::vector<pollfd> fds;
    
public:
    PollServer(int port) : port(port), server_fd(-1) {}
    
    ~PollServer() {
        cleanup();
    }
    
    bool initialize() {
        // 创建socket
        server_fd = socket(AF_INET, SOCK_STREAM, 0);
        if (server_fd < 0) {
            std::cerr << "创建socket失败: " << strerror(errno) << std::endl;
            return false;
        }
        
        // 设置socket选项,允许地址重用
        int opt = 1;
        if (setsockopt(server_fd, SOL_SOCKET, SO_REUSEADDR, &opt, sizeof(opt)) < 0) {
            std::cerr << "设置socket选项失败: " << strerror(errno) << std::endl;
            return false;
        }
        
        // 绑定地址
        struct sockaddr_in addr;
        memset(&addr, 0, sizeof(addr));
        addr.sin_family = AF_INET;
        addr.sin_addr.s_addr = INADDR_ANY;
        addr.sin_port = htons(port);
        
        if (bind(server_fd, (struct sockaddr*)&addr, sizeof(addr)) < 0) {
            std::cerr << "绑定地址失败: " << strerror(errno) << std::endl;
            return false;
        }
        
        // 监听连接
        if (listen(server_fd, 10) < 0) {
            std::cerr << "监听失败: " << strerror(errno) << std::endl;
            return false;
        }
        
        // 设置非阻塞模式(可选)
        fcntl(server_fd, F_SETFL, fcntl(server_fd, F_GETFL, 0) | O_NONBLOCK);
        
        // 将server_fd添加到pollfd数组
        pollfd server_pollfd;
        server_pollfd.fd = server_fd;
        server_pollfd.events = POLLIN;  // 监听可读事件
        server_pollfd.revents = 0;
        fds.push_back(server_pollfd);
        
        std::cout << "服务器初始化成功,监听端口: " << port << std::endl;
        return true;
    }
    
    void run() {
        std::cout << "服务器开始运行..." << std::endl;
        
        while (true) {
            // 调用poll等待事件
            int ret = poll(fds.data(), fds.size(), -1);  // -1表示无限等待
            
            if (ret < 0) {
                std::cerr << "poll错误: " << strerror(errno) << std::endl;
                break;
            }
            
            if (ret == 0) {
                // 超时,但这里不会发生,因为timeout=-1
                continue;
            }
            
            // 检查所有文件描述符
            for (size_t i = 0; i < fds.size(); i++) {
                if (fds[i].revents == 0) {
                    continue;
                }
                
                // 处理错误或挂断事件
                if (fds[i].revents & (POLLERR | POLLHUP | POLLNVAL)) {
                    if (fds[i].fd == server_fd) {
                        std::cerr << "服务器socket错误,停止服务" << std::endl;
                        return;
                    } else {
                        std::cout << "客户端连接关闭或出错: fd=" << fds[i].fd << std::endl;
                        close(fds[i].fd);
                        fds.erase(fds.begin() + i);
                        i--;  // 调整索引
                        continue;
                    }
                }
                
                // 处理可读事件
                if (fds[i].revents & POLLIN) {
                    if (fds[i].fd == server_fd) {
                        // 有新的客户端连接
                        handleNewConnection();
                    } else {
                        // 客户端有数据可读
                        handleClientData(i);
                    }
                }
            }
        }
    }
    
private:
    void handleNewConnection() {
        struct sockaddr_in client_addr;
        socklen_t addr_len = sizeof(client_addr);
        
        // 接受新连接
        int client_fd = accept(server_fd, (struct sockaddr*)&client_addr, &addr_len);
        
        if (client_fd < 0) {
            std::cerr << "接受连接失败: " << strerror(errno) << std::endl;
            return;
        }
        
        // 设置非阻塞模式
        fcntl(client_fd, F_SETFL, fcntl(client_fd, F_GETFL, 0) | O_NONBLOCK);
        
        // 添加新的客户端到pollfd数组
        pollfd client_pollfd;
        client_pollfd.fd = client_fd;
        client_pollfd.events = POLLIN;
        client_pollfd.revents = 0;
        fds.push_back(client_pollfd);
        
        std::cout << "新客户端连接: fd=" << client_fd 
                  << " IP=" << inet_ntoa(client_addr.sin_addr) 
                  << ":" << ntohs(client_addr.sin_port) << std::endl;
    }
    
    void handleClientData(size_t index) {
        char buffer[1024];
        ssize_t bytes_read;
        
        // 读取数据
        bytes_read = recv(fds[index].fd, buffer, sizeof(buffer) - 1, 0);
        
        if (bytes_read <= 0) {
            // 连接关闭或错误
            if (bytes_read == 0) {
                std::cout << "客户端断开连接: fd=" << fds[index].fd << std::endl;
            } else {
                std::cerr << "读取错误: " << strerror(errno) << std::endl;
            }
            close(fds[index].fd);
            fds.erase(fds.begin() + index);
            return;
        }
        
        // 确保字符串以null结尾
        buffer[bytes_read] = '\0';
        
        std::cout << "收到客户端 " << fds[index].fd << " 的数据: " << buffer;
        
        // 回显数据给客户端
        std::string response = "服务器收到: ";
        response += buffer;
        
        ssize_t bytes_sent = send(fds[index].fd, response.c_str(), response.length(), 0);
        
        if (bytes_sent < 0) {
            std::cerr << "发送失败: " << strerror(errno) << std::endl;
            close(fds[index].fd);
            fds.erase(fds.begin() + index);
        }
    }
    
    void cleanup() {
        // 关闭所有客户端连接
        for (size_t i = 0; i < fds.size(); i++) {
            if (fds[i].fd >= 0) {
                close(fds[i].fd);
            }
        }
        fds.clear();
        
        // 关闭服务器socket
        if (server_fd >= 0) {
            close(server_fd);
            server_fd = -1;
        }
        
        std::cout << "服务器资源已清理" << std::endl;
    }
};

int main() {
    PollServer server(8888);
    
    if (!server.initialize()) {
        std::cerr << "服务器初始化失败" << std::endl;
        return 1;
    }
    
    server.run();
    
    return 0;
}


评论