muduo库的模拟实现——TcpServer部分

一、Connection模块

Connection类可以说是muduo库里最复杂也是最核心的类的,在我看来这个类有承上启下的作用,承上是通过TcpServer模块与上层应用层协议进行交互,启下关联的是EventLoop等一系列Reactor部分的模块。所以本篇文章会着重介绍Connection模块的实现。

1.成员变量

我们先来看一下Connection类的成员变量:

    private:
        uint64_t _connectionID;    // 连接的唯一ID,便于连接的管理和查找
        uint64_t _timerID;         // 定时器ID,必须是唯一的,这块为了简化操作,使用_connectionID作为定时器ID
        int _sockfd;               // 连接关联的文件描述符
        bool _inactiveReleaseFlag; // 连接是否启动非活跃销毁的判断标志,默认为false表示关闭
        EventLoop *_eventLoop;     // 连接所关联的一个EventLoop
        ConnectionStatus _status;  // 连接的状态
        Socket _socket;            // 套接字操作管理
        Channel _channel;          // 连接的事件管理
        Buffer _inBUffer;          // 输入缓冲区---存放从socket中读取到的数据
        Buffer _outBuffer;         // 输出缓冲区---存放要发送给对端的数据
        ConnectedCallBack _connectedCallBack;
        MessageCallBack _messageCallBack;
        CloseCallBack _closeCallBack;
        EventCallBack _eventCallBack;

        // 这是组件内的连接关闭回调函数,是组件内设置的,因为服务器组件内会把所有的连接管理起来
        // 一旦某个连接要关闭了,就应该从管理的地方移除掉自己的信息
        CloseCallBack _serverCloseCallBack;
        Any _context; // 请求的接收处理上下文

其中有一个EventLoop对象_eventLoop,这是每个Connection都要关联的一个EventLoop对象,将来构造Connection对象的时候需要通过EventLoop对象来构造。什么时候会构造Connection对象呢?当主线程接收到新连接的时候。那由谁来构造Connection对象呢?当新连接到来的时候,TcpServer服务器会从线程池中选择一个子线程,由于子线程与EventLoop对象是一一对应的,所以可以通过子线程的EventLoop对象来构造Connection对象,这样就能将新连接通过Connection对象管理起来了。

Connection类有了EventLoop对象以后,还需要有Channel对象,用EventLoop对象构造一个Channel对象进行连接的事件管理。因为新连接会有IO数据通信,所以需要Channel对象来监控这些事件。连接到来以后,Connection模块通过Channel对象启动可读事件监控,就可以等待对方发送数据过来。当服务端要响应回去的时候,再通过Channel对象启动可写事件监控,就可以等待数据的发送。

既然涉及到数据的发送,那就必须要有缓冲区了,所以Connection类有接收缓冲区_inbuffer,和发送缓冲区_outbuffer。因为我们通过Channel对象启动读事件监控以后,当对方发送数据过来以后,比如发送的是HTTP请求报文,这时候就会调用可读事件的回调函数,去read读取这个请求报文,但读取上来不可能是Connection模块处理,只有HTTP模块能处理这些请求报文,所以Connection模块只负责数据的接收和保存,保存就保存在它的接收缓冲区中,等待上层HTTP模块来提取并解析。并且,通过缓冲区也能解决数据读取不完整或者出现粘包问题,因为上层一旦解析到报文不完整,就不会设置解析状态为finish完成解析,会继续等待Connection接收缓冲区的下一轮数据,不过这都是上层HTTP协议层要关心的事情,Connection模块并不关心,这就是缓冲区在这里的意义。

除此之外,Connection类还需要提供回调函数供上层使用者去设置,这些回调函数会在下面详细介绍。最后,Connection类还有一个ConnectionStatus状态机,就是用来表示当前连接的状态,分别有连接关闭状态、连接刚建立状态、连接完成状态和连接待关闭状态。

    typedef enum
    {
        DISCONNECTED, // 连接关闭状态
        CONNECTING,   // 连接刚建立的状态
        CONNECTED,    // 连接完成状态,可以进行各种通讯数据操作了
        DISCONNECTING // 连接待关闭状态
    } ConnectionStatus;

2.构造函数以及Channel的五个事件回调函数

介绍了成员变量以后,接下来再看看Connection类的构造函数,构造函数主要是对成员变量进行一些初始化赋值,并且设置channel对象的五个事件回调函数,这五个事件回调函数是Connection类提供的。

        Connection(EventLoop *eventLoop, uint64_t connectionID, int sockfd)
        :_connectionID(connectionID), _sockfd(sockfd), _inactiveReleaseFlag(false), _eventLoop(eventLoop), 
        _status(CONNECTING), _socket(_sockfd), _channel(eventLoop, _sockfd)
        {
            _channel.setCloseCallBack(std::bind(&Connection::handleClose, this));
            _channel.setEventCallBack(std::bind(&Connection::handleEvent, this));
            _channel.setReadAbleCallBack(std::bind(&Connection::handleRead, this));
            _channel.setWriteAbleCallBack(std::bind(&Connection::handleWrite, this));
            _channel.setErrorCallBack(std::bind(&Connection::handleError, this));
        }

下面再来介绍五个事件回调函数的实现,这五个事件回调函数就是连接关联的从属Reactor监控到事件IO事件触发以后,执行的回调函数,比如客户端向连接发送HTTP请求报文,那么epoll模型就会监控到可读事件触发,通过Channel对象去调用可读事件触发的回调函数,而这个函数绑定的就是Connection类实现的handleRead函数,这个函数的逻辑就是通过read接口或者recv接口将数据从连接的套接字中读取上来,至于如何处理那就是上层HTTP协议层需要关心的事情了。

handleClose函数:
首先看一下handleClose函数的实现,这是关闭连接事件触发的回调函数,一旦连接关闭了,套接字就不能做其它操作了,我们就没必要保存记录这个连接的信息了,所以需要关闭连接。但是关闭连接并不能立即关闭,因为有可能接收缓冲区里还有数据没有被处理,所以必须将接收缓冲区的数据通过_messageCallBack回调函数交给上层的协议层去处理,处理完毕以后,再调用releaseInLoop函数执行关闭连接的操作。releaseInLoop函数也是Connection类实现的成员函数,这是真正关闭释放连接的函数,下面会介绍。

        // 描述符触发挂断事件
        void handleClose()
        {
            // 一旦连接挂断了,套接字就什么都干不了了,因此有数据待处理就处理一下,处理完毕以后就关闭连接
            if(_inBUffer.getReadableSize() > 0)
            {
                _messageCallBack(shared_from_this(), &_inBUffer);
            }
            return releaseInLoop();
        }

handleEvent函数:
我们之前实现Channel类的时候,可读事件触发就调用可读事件绑定的回调函数,可写事件触发就调用可写事件绑定的回调函数,但无论是什么事件触发,最后都会调用handleEvent回调函数,这是任意事件触发都会调用的回调函数,所以Connection也要提供一个任意事件触发的回调函数。这个函数接口是用来实现其它四个事件回调函数都要执行的操作,为了不让代码重复冗余,就把这些操作单独拿出来执行,比如任意事件触发都要刷新定时器任务。

        // 描述符触发任意事件
        void handleEvent()
        {
            // 1.刷新连接的活跃度---延迟定时销毁任务
            if(_inactiveReleaseFlag == true)
            {
                _eventLoop->refreshTimer(_connectionID);
            }
            // 2.调用组件使用者的任意事件回调
            if(_eventCallBack)
            {
                _eventCallBack(shared_from_this());
            }
        }

handleRead函数:
handleRead函数就是可读事件触发的回调函数,当连接的文件描述符有数据到来的时候,可读事件就会触发,这个函数就会被调用。该函数会将文件描述符的数据都读取上来,保存在inBuffer接收缓冲区中,然后将接收缓冲区的数据通过_messageCallBack函数传递给上层,让上层去处理这些数据。_messageCallBack函数是外部设置的回调函数,一般绑定的就是应用层协议进行报文接收并解析的函数,也就是Connection负责将数据读取上来,应用层的_messageCallBack函数负责接收和解析报文。

        // 描述符可读事件触发后调用的函数,接收socket数据放到接收缓冲区中,然后调用messageCallBack
        void handleRead()
        {
            // 1.接收socket的数据放到缓冲区
            char buffer[65536];
            ssize_t recvRes = _socket.nonBlockRecv(buffer, 65535);
            if (recvRes < 0)
            {
                // 出错了,但也不能直接关闭连接,需要先看一下有没有数据还没有处理
                // 得要看一下发送缓冲区还有没有数据没有发送
                // 接收缓冲区还有没有数据没有处理
                return shutdownInLoop();
            }
            // 将数据放入接收缓冲区,并且将写偏移向后移动
            _inBUffer.writeAndPush(buffer, recvRes);
            // 2.调用messageCallBack进行业务处理
            if (_inBUffer.getReadableSize() > 0)
            {
                // shared_from_this---从当前对象自身获取自身的shared_ptr
                return _messageCallBack(shared_from_this(), &_inBUffer);
            }
        }

handleWrite函数:
handleWrite函数和handleRead函数是同理的,当连接的可写事件触发的时候,就代表服务器可以向对方发送数据了,于是就将outBuffer发送缓冲区里的数据发送出去。发送完之后,如果发送缓冲区中已经没有数据了,就可以关闭写事件监控了。

        // 描述符可写事件触发后调用的函数,将发送缓冲区的数据进行发送
        void handleWrite()
        {
            // outBuffer中保存的数据就是要发送的数据
            ssize_t writeRes = _socket.nonBlockSend(_outBuffer.getReadStartPosition(), _outBuffer.getReadableSize());
            if (writeRes < 0)
            {
                // 发送错误就该关闭连接了,但是可能缓冲区中还有数据需要被处理
                if (_inBUffer.getReadableSize() > 0)
                {
                    _messageCallBack(shared_from_this(), &_inBUffer);
                    // 这时候就是实际的关闭释放操作了
                    return releaseInLoop();
                }
            }
            // 还需要将读偏移向后移动
            _outBuffer.moveReadOffset(writeRes);

            // 如果发送缓冲区没有数据可写,就要关闭写事件监控
            if (_outBuffer.getReadableSize() == 0)
            {
                _channel.closeWriteAbleEvent();
                // 如果当前是连接待关闭状态,则有数据就发送完数据释放连接,没有数据就直接释放连接
                if (_status == DISCONNECTING)
                {
                    return releaseInLoop();
                }
            }
        }

handleError函数:
handleError函数实现的比较简单粗暴,只要是出错了,那就调用handleClose函数关闭连接即可。

        // 描述符触发出错事件
        void handleError()
        {
            handleClose();
        }

3.建立连接接口

当新连接到来的时候,TcpServer模块为新连接创建好Connection对象并且设置好一些列回调函数以后,就要调用Connection对象的建立连接接口,这个有点像Connection对象的启动接口吧,将连接状态设置为已建立连接状态,然后启动Channel对象的可读事件监控,最后调用应用层协议的建立连接函数_connectedCallBack,有可能应用层协议也需要对新连接进行一些处理。

        // 连接获取之后所处的状态下要进行的各种设置
        // 启动读监控、调用回调函数
        // 该函数执行完毕以后,则连接进入已完成连接状态
        void establishedInLoop()
        {
            // 1.修改连接状态
            // 首先当前状态必须是上层的半连接状态
            assert(_status == CONNECTING);
            _status = CONNECTED;
            // 一旦启动读事件监控,就有可能会立即触发读事件,如果这时候启动了非活跃连接销毁
            // 2.启动读事件监控
            _channel.startReadAbleEvent();
            // 3.调用回调函数
            if(_connectedCallBack)
            {
                _connectedCallBack(shared_from_this());
            }
            
        }

4.发送接口

我们知道当上层协议构建好响应报文之后,就要交给Connection模块进行发送,所以Connection需要一个发送接口,供给外部协议使用。外部只需要传递Buffer缓冲区,该函数就会把形参缓冲区的数据拷贝到自己的发送缓冲区中。注意,这个函数并不是实际的发送数据的函数,因为我们不确定现在是否可以发送数据,如果不可发送而盲目发送,就有可能会导致要发送的数据出现丢失。所以我们必须让epoll模型帮我们监控可写事件,当可写事件触发的时候才代表可以发送数据,然后通过handleWrite函数将数据发送出去。

        // 这个接口并不是实际的发送数据接口,而是把数据放到了发送缓冲区,启动了可写事件监控
        void sendInLoop(Buffer &buffer)
        {
            if(_status == DISCONNECTED)
            {
                return;
            }
            _outBuffer.writeBufferAndPush(buffer);
            if(_channel.isWriteAble() == false)
            {
                _channel.startWriteAbleEvent();
            }
        }

5.关闭连接接口

关闭连接接口是shutdownInLoop,这个函数接口也不是直接关闭连接,因为可能接收缓冲区和发送缓冲区有数据需要处理,所以在关闭之前需要检查并处理这些数据。如果接收缓冲区还有数据要处理,就调用_messageCallBack函数让上层去处理。如果发送缓冲区还有数据要处理,就启动Channel对象的可写事件监控,将发送缓冲区的数据发送出去。

		// 实际的释放接口
        void releaseInLoop()
        {
            // 1.修改连接状态,将其置为DISCONNECTED
            _status = DISCONNECTED;
            // 2.移除连接的事件监控
            _channel.removeEvent();
            // 3.关闭描述符
            _socket.closeSocket();
            // 4.如果当前定时器队列中还有定时销毁任务,则取消任务
            if(_eventLoop->hasTimer(_connectionID))
            {
                cancelInactiveReleaseInLoop();
            }
            // 5.调用关闭回调函数
            // 避免先移除服务器管理的连接信息导致Connection被释放再去处理,可能会出错
            // 所以要先调用用户的关闭事件
            if(_closeCallBack)
            {
                _closeCallBack(shared_from_this());
            }
            // 移除服务器内部管理的连接信息
            if(_serverCloseCallBack)
            {
                _serverCloseCallBack(shared_from_this());
            }
        }

        // 这个关闭操作并非实际的连接释放操作,需要判断还有没有数据待处理,待发送
        void shutdownInLoop()
        {
            // 设置连接为半关闭状态
            _status = DISCONNECTING;
            if(_inBUffer.getReadableSize() > 0)
            {
                if(_messageCallBack)
                {
                    _messageCallBack(shared_from_this(), &_inBUffer);
                }
            }
            // 要么就是写入数据的时候出错关闭,要么就是没有待发送数据,直接关闭
            if(_outBuffer.getReadableSize() > 0)
            {
                if(_channel.isWriteAble() == false)
                {
                    _channel.startWriteAbleEvent();
                }
            }
            if(_outBuffer.getReadableSize() == 0)
            {
                releaseInLoop();
            }
        }

至此Connection模块的大部分功能都已经实现了,接下来的一些函数接口都比较简单,或者是这些函数的封装与复用,这里就不再过多介绍了,可以直接看完整代码:

namespace ns_connection
{
    typedef enum
    {
        DISCONNECTED, // 连接关闭状态
        CONNECTING,   // 连接刚建立的状态
        CONNECTED,    // 连接完成状态,可以进行各种通讯数据操作了
        DISCONNECTING // 连接关闭状态
    } ConnectionStatus;

    class Connection;
    // 这四个回调函数,是让组件使用者来设置的
    using ConnectionSharedPtr = std::shared_ptr<Connection>;
    using ConnectedCallBack = std::function<void(const ConnectionSharedPtr &)>;
    using MessageCallBack = std::function<void(const ConnectionSharedPtr &, Buffer *)>;
    using CloseCallBack = std::function<void(const ConnectionSharedPtr &)>;
    using EventCallBack = std::function<void(const ConnectionSharedPtr &)>;

    class Connection : public std::enable_shared_from_this<Connection>
    {
    public:
        Connection(EventLoop *eventLoop, uint64_t connectionID, int sockfd)
        :_connectionID(connectionID), _sockfd(sockfd), _inactiveReleaseFlag(false), _eventLoop(eventLoop), 
        _status(CONNECTING), _socket(_sockfd), _channel(eventLoop, _sockfd)
        {
            _channel.setCloseCallBack(std::bind(&Connection::handleClose, this));
            _channel.setEventCallBack(std::bind(&Connection::handleEvent, this));
            _channel.setReadAbleCallBack(std::bind(&Connection::handleRead, this));
            _channel.setWriteAbleCallBack(std::bind(&Connection::handleWrite, this));
            _channel.setErrorCallBack(std::bind(&Connection::handleError, this));
        }

        ~Connection()
        {
            LOG("RELEASE CONNECTION:%p", this);
        }

        // 发送数据,将数据放到发送缓冲区,启动写事件监控
        void send(const char *data, size_t len)
        {
            // 注意:外界传入的data,可能是个临时的空间,我们现在只是把发送操作压入了任务池,有可能并没有被立即执行
            // 因此有可能执行的时候,data指向的空间已经被释放了。
            Buffer buffer;
            buffer.writeAndPush(data, len);
            _eventLoop->runInLoop(std::bind(&Connection::sendInLoop, this, std::move(buffer)));
        }

        // 提供给组件使用者的关闭接口--并不实际关闭,需要判断有没有数据待处理
        void shutdown()
        {
            _eventLoop->runInLoop(std::bind(&Connection::shutdownInLoop, this));
        }

        // 启动非活跃销毁,并定义多长时间无通信就是非活跃,添加定时任务
        void startInactiveRelease(int sec)
        {
            _eventLoop->runInLoop(std::bind(&Connection::startInactiveReleaseInLoop, this, sec));
        }

        // 取消非活跃销毁
        void cancelInactiveRelease(int sec)
        {
            _eventLoop->runInLoop(std::bind(&Connection::cancelInactiveReleaseInLoop, this));
        }

        // 切换协议---重置上下文以及阶段性处理函数 -- 这个接口必须在eventLoop线程中立即执行
        // 防备新的事件触发后,处理的时候,切换任务还没有被执行---会导致数据使用原协议处理了
        void upGrade(const Any &context, const ConnectedCallBack &connectedCallBack, const MessageCallBack &messageCallBack,
                     const CloseCallBack &closeCallBack, const EventCallBack &eventCallBack)
        {
            // 所以这里需要断言一下,判断当前线程只能是eventLoop线程
            _eventLoop->assertInLoop();
            _eventLoop->runInLoop(std::bind(&Connection::upGradeInLoop, this, context, connectedCallBack, 
                                messageCallBack, closeCallBack, eventCallBack));
        }

        // 获取管理的文件描述符
        int getSocketfd()
        {
            return _sockfd;
        }

        // 获取连接ID
        int getConnectionID()
        {
            return _connectionID;
        }

        // 返回是否处于连接状态
        bool isConnected()
        {
            return (_status == CONNECTED);
        }

        // 设置上下文
        void setContext(const Any &context)
        {
            _context = context;
        }

        // 获取上下文
        Any *getContext()
        {
            return &_context;
        }

        void setConnectedCallBack(const ConnectedCallBack &callBack)
        {
            _connectedCallBack = callBack;
        }

        void setMessageCallBack(const MessageCallBack &callBack)
        {
            _messageCallBack = callBack;
        }

        void setCloseCallBack(const CloseCallBack &callBack)
        {
            _closeCallBack = callBack;
        }

        void setEventCallBack(const EventCallBack &callBack)
        {
            _eventCallBack = callBack;
        }

        void setServerClosedCallBack(const CloseCallBack &callBack)
        {
            _serverCloseCallBack = callBack;
        }

        // 连接建立就绪后,进行channel回调设置以及启动读监控
        void established()
        {
            _eventLoop->runInLoop(std::bind(&Connection::establishedInLoop, this));
        }

    private:
        // 连接获取之后所处的状态下要进行的各种设置
        // 启动读监控、调用回调函数
        // 该函数执行完毕以后,则连接进入已完成连接状态
        void establishedInLoop()
        {
            // 1.修改连接状态
            // 首先当前状态必须是上层的半连接状态
            assert(_status == CONNECTING);
            _status = CONNECTED;
            // 一旦启动读事件监控,就有可能会立即触发读事件,如果这时候启动了非活跃连接销毁
            // 2.启动读事件监控
            _channel.startReadAbleEvent();
            // 3.调用回调函数
            if(_connectedCallBack)
            {
                _connectedCallBack(shared_from_this());
            }
            
        }

        // 这个接口并不是实际的发送数据接口,而是把数据放到了发送缓冲区,启动了可写事件监控
        void sendInLoop(Buffer &buffer)
        {
            if(_status == DISCONNECTED)
            {
                return;
            }
            _outBuffer.writeBufferAndPush(buffer);
            if(_channel.isWriteAble() == false)
            {
                _channel.startWriteAbleEvent();
            }
        }

        // 实际的释放接口
        void releaseInLoop()
        {
            // 1.修改连接状态,将其置为DISCONNECTED
            _status = DISCONNECTED;
            // 2.移除连接的事件监控
            _channel.removeEvent();
            // 3.关闭描述符
            _socket.closeSocket();
            // 4.如果当前定时器队列中还有定时销毁任务,则取消任务
            if(_eventLoop->hasTimer(_connectionID))
            {
                cancelInactiveReleaseInLoop();
            }
            // 5.调用关闭回调函数
            // 避免先移除服务器管理的连接信息导致Connection被释放再去处理,可能会出错
            // 所以要先调用用户的关闭事件
            if(_closeCallBack)
            {
                _closeCallBack(shared_from_this());
            }
            // 移除服务器内部管理的连接信息
            if(_serverCloseCallBack)
            {
                _serverCloseCallBack(shared_from_this());
            }
        }

        // 这个关闭操作并非实际的连接释放操作,需要判断还有没有数据待处理,待发送
        void shutdownInLoop()
        {
            // 设置连接为半关闭状态
            _status = DISCONNECTING;
            if(_inBUffer.getReadableSize() > 0)
            {
                if(_messageCallBack)
                {
                    _messageCallBack(shared_from_this(), &_inBUffer);
                }
            }
            // 要么就是写入数据的时候出错关闭,要么就是没有待发送数据,直接关闭
            if(_outBuffer.getReadableSize() > 0)
            {
                if(_channel.isWriteAble() == false)
                {
                    _channel.startWriteAbleEvent();
                }
            }
            if(_outBuffer.getReadableSize() == 0)
            {
                releaseInLoop();
            }
        }

        // 启动非活跃连接超时释放规则
        void startInactiveReleaseInLoop(int sec)
        {
            // 1.将判断标志_inactiveReleaseFlag置为true
            _inactiveReleaseFlag = true;
            // 2.如果当前定时销毁任务已经存在,那就刷新延迟一下即可
            if(_eventLoop->hasTimer(_connectionID) == true)
            {
                return _eventLoop->refreshTimer(_connectionID);
            }
            // 3.如果不存在定时销毁任务,则新增
            _eventLoop->addTimer(_connectionID, sec, std::bind(&Connection::releaseInLoop, this));
        }

        void cancelInactiveReleaseInLoop()
        {
            _inactiveReleaseFlag = false;
            if(_eventLoop->hasTimer(_connectionID) == true)
            {
                _eventLoop->cancelTimer(_connectionID);
            }
        }

        void upGradeInLoop(const Any &context,
                           const ConnectedCallBack &connectedCallBack,
                           const MessageCallBack &messageCallBack,
                           const CloseCallBack &closeCallBack,
                           const EventCallBack &eventCallBack)
        {
            _context = context;
            _connectedCallBack = connectedCallBack;
            _messageCallBack = messageCallBack;
            _closeCallBack = closeCallBack;
            _eventCallBack = eventCallBack;
        }

        // 五个channel事件的回调函数

        // 描述符可读事件触发后调用的函数,接收socket数据放到接收缓冲区中,然后调用messageCallBack
        void handleRead()
        {
            // 1.接收socket的数据放到缓冲区
            char buffer[65536];
            ssize_t recvRes = _socket.nonBlockRecv(buffer, 65535);
            if (recvRes < 0)
            {
                // 出错了,但也不能直接关闭连接,需要先看一下有没有数据还没有处理
                // 得要看一下发送缓冲区还有没有数据没有发送
                // 接收缓冲区还有没有数据没有处理
                return shutdownInLoop();
            }
            // 将数据放入接收缓冲区,并且将写偏移向后移动
            _inBUffer.writeAndPush(buffer, recvRes);
            // 2.调用messageCallBack进行业务处理
            if (_inBUffer.getReadableSize() > 0)
            {
                // shared_from_this---从当前对象自身获取自身的shared_ptr
                return _messageCallBack(shared_from_this(), &_inBUffer);
            }
        }

        // 描述符可写事件触发后调用的函数,将发送缓冲区的数据进行发送
        void handleWrite()
        {
            // outBuffer中保存的数据就是要发送的数据
            ssize_t writeRes = _socket.nonBlockSend(_outBuffer.getReadStartPosition(), _outBuffer.getReadableSize());
            if (writeRes < 0)
            {
                // 发送错误就该关闭连接了,但是可能缓冲区中还有数据需要被处理
                if (_inBUffer.getReadableSize() > 0)
                {
                    _messageCallBack(shared_from_this(), &_inBUffer);
                    // 这时候就是实际的关闭释放操作了
                    return releaseInLoop();
                }
            }
            // 还需要将读偏移向后移动
            _outBuffer.moveReadOffset(writeRes);

            // 如果发送缓冲区没有数据可写,就要关闭写事件监控
            if (_outBuffer.getReadableSize() == 0)
            {
                _channel.closeWriteAbleEvent();
                // 如果当前是连接待关闭状态,则有数据就发送完数据释放连接,没有数据就直接释放连接
                if (_status == DISCONNECTING)
                {
                    return releaseInLoop();
                }
            }
        }

        // 描述符触发挂断事件
        void handleClose()
        {
            // 一旦连接挂断了,套接字就什么都干不了了,因此有数据待处理就处理一下,处理完毕以后就关闭连接
            if(_inBUffer.getReadableSize() > 0)
            {
                _messageCallBack(shared_from_this(), &_inBUffer);
            }
            return releaseInLoop();
        }

        // 描述符触发出错事件
        void handleError()
        {
            handleClose();
        }

        // 描述符触发任意事件
        void handleEvent()
        {
            // 1.刷新连接的活跃度---延迟定时销毁任务
            if(_inactiveReleaseFlag == true)
            {
                _eventLoop->refreshTimer(_connectionID);
            }
            // 2.调用组件使用者的任意事件回调
            if(_eventCallBack)
            {
                _eventCallBack(shared_from_this());
            }
        }

    private:
        uint64_t _connectionID;    // 连接的唯一ID,便于连接的管理和查找
        uint64_t _timerID;         // 定时器ID,必须是唯一的,这块为了简化操作,使用_connectionID作为定时器ID
        int _sockfd;               // 连接关联的文件描述符
        bool _inactiveReleaseFlag; // 连接是否启动非活跃销毁的判断标志,默认为false表示关闭
        EventLoop *_eventLoop;     // 连接所关联的一个EventLoop
        ConnectionStatus _status;  // 连接的状态
        Socket _socket;            // 套接字操作管理
        Channel _channel;          // 连接的事件管理
        Buffer _inBUffer;          // 输入缓冲区---存放从socket中读取到的数据
        Buffer _outBuffer;         // 输出缓冲区---存放要发送给对端的数据
        ConnectedCallBack _connectedCallBack;
        MessageCallBack _messageCallBack;
        CloseCallBack _closeCallBack;
        EventCallBack _eventCallBack;

        // 这是组件内的连接关闭回调函数,是组件内设置的,因为服务器组件内会把所有的连接管理起来
        // 一旦某个连接要关闭了,就应该从管理的地方移除掉自己的信息
        CloseCallBack _serverCloseCallBack;
        Any _context; // 请求的接收处理上下文
    };
}

二、TcpServer模块

TcpServer模块就是muduo库Tcp服务器的最后一个模块了,其实有了上面介绍的Connection模块的设计思路,TcpServer模块就变得非常简单了,在我看来这个模块更多的是整合和封装,最后提供接口给上层协议去调用。首先来看一下TcpServer类的成员变量:

private:
    uint64_t _nextID;                                                              // 自动增长的连接ID
    uint16_t _port;
    int _timeout;                                                                  // 非活跃连接的统计时间
    bool _inactiveReleaseFlag;                                                     // 是否启动超时销毁连接的标志
    EventLoop _baseLoop;                                                           // 主线程的EventLoop对象,负责监听事件的处理
    Acceptor _acceptor;                                                            // 监听套接字的管理对象
    LoopThreadPool _threadPool;                                                    // 从属EventLoop的线程池
    std::unordered_map<uint64_t, ns_connection::ConnectionSharedPtr> _connections; // 保存管理所有连接对应的shared_ptr对象

    ns_connection::ConnectedCallBack _connectedCallBack;
    ns_connection::MessageCallBack _messageCallBack;
    ns_connection::CloseCallBack _closeCallBack;
    ns_connection::EventCallBack _eventCallBack;

TcpServer类有一个EventLoop对象是主Reactor,这个Reactor只负责监控和接收连接的到来,其实也就是说TcpServer模块只负责接收新连接,不进行连接的数据接收和转发,连接数据接收是Connection模块完成的事情,TcpServer模块只将新连接分派给从属Reactor去管理。除此之外,TcpServer类还有四类回调函数,这四类回调函数其实是给上层协议去设置的,而且准确地说这不是TcpServer的回调函数,而是Connection的回调函数。

接下来再来看看TcpServer类的构造函数,在构造函数中,我们要通过Acceptor对象设置接收连接的回调函数,绑定的函数是TcpServer类实现的newConnection。当接收到新连接时,就会调用newConnection进行新连接的处理。设置好回调函数以后,就开始accept监控连接。

    TcpServer(uint16_t port)
        : _port(port), _nextID(0), _inactiveReleaseFlag(false), 
        _acceptor(&_baseLoop, port), _threadPool(&_baseLoop)
    {
        _acceptor.setAcceptCallBack(std::bind(&TcpServer::newConnection, this, std::placeholders::_1));
        _acceptor.startListen();
    }

接下来就是TcpServer最核心的函数newConnection了,这个函数会在TcpServer接收到新连接以后调用,函数的功能就是为连接创建Connection对象,并设置Connection的回调函数,然后启动连接的监控。这里创建新Connection对象绑定的EventLoop对象,是从线程池中选取的子线程,因为我们的从属Reactor是与子线程一一绑定的。

    // 为新连接构造一个Connection进行管理
    void newConnection(int fd)
    {
        _nextID++;
        ns_connection::ConnectionSharedPtr conn(new ns_connection::Connection(_threadPool.nextEventLoop(), _nextID, fd));
        conn->setMessageCallBack(_messageCallBack);
        conn->setCloseCallBack(_closeCallBack);
        conn->setConnectedCallBack(_connectedCallBack);
        conn->setEventCallBack(_eventCallBack);
        conn->setServerClosedCallBack(std::bind(&TcpServer::removeConnection, this, std::placeholders::_1));
        if (_inactiveReleaseFlag)
        {
            conn->startInactiveRelease(_timeout);
        }
        conn->established();
        _connections.insert(std::make_pair(_nextID, conn));
    }

至此TcpServer模块的核心部分也介绍完毕了,剩下的接口比较简单,可以直接看完整代码:

class TcpServer
{
public:
    TcpServer(uint16_t port)
        : _port(port), _nextID(0), _inactiveReleaseFlag(false), 
        _acceptor(&_baseLoop, port), _threadPool(&_baseLoop)
    {
        _acceptor.setAcceptCallBack(std::bind(&TcpServer::newConnection, this, std::placeholders::_1));
        _acceptor.startListen();
    }

    void setThreadCount(int count)
    {
        _threadPool.setThreadCount(count);
    }

    void setConnectedCallBack(const ns_connection::ConnectedCallBack &callBack)
    {
        _connectedCallBack = callBack;
    }

    void setMessageCallBack(const ns_connection::MessageCallBack &callBack)
    {
        _messageCallBack = callBack;
    }

    void setCloseCallBack(const ns_connection::CloseCallBack &callBack)
    {
        _closeCallBack = callBack;
    }

    void setEventCallBack(const ns_connection::EventCallBack &callBack)
    {
        _eventCallBack = callBack;
    }

    void startInactiveRelease(int timeout)
    {
        _timeout = timeout;
        _inactiveReleaseFlag = true;
    }

    void start()
    {
        _threadPool.create();
        _baseLoop.start();
    }

    // 用于添加定时任务
    void runAfter(const Functor &task, int delay)
    {
        _baseLoop.runInLoop(std::bind(&TcpServer::runAfterInLoop, this, task, delay));
    }

private:
    // 为新连接构造一个Connection进行管理
    void newConnection(int fd)
    {
        _nextID++;
        ns_connection::ConnectionSharedPtr conn(new ns_connection::Connection(_threadPool.nextEventLoop(), _nextID, fd));
        conn->setMessageCallBack(_messageCallBack);
        conn->setCloseCallBack(_closeCallBack);
        conn->setConnectedCallBack(_connectedCallBack);
        conn->setEventCallBack(_eventCallBack);
        conn->setServerClosedCallBack(std::bind(&TcpServer::removeConnection, this, std::placeholders::_1));
        if (_inactiveReleaseFlag)
        {
            conn->startInactiveRelease(_timeout);
        }
        conn->established();
        _connections.insert(std::make_pair(_nextID, conn));
    }

    // 从管理Connection的_connections中移除连接信息
    void removeConnection(const ns_connection::ConnectionSharedPtr &conn)
    {
        _baseLoop.runInLoop(std::bind(&TcpServer::removeConnectionInLoop, this, conn));
    }

    void removeConnectionInLoop(const ns_connection::ConnectionSharedPtr &conn)
    {
        int id = conn->getConnectionID();
        auto iter = _connections.find(id);
        if (iter != _connections.end())
        {
            _connections.erase(iter);
        }
    }

    void runAfterInLoop(const Functor &task, int delay)
    {
        _nextID++;
        _baseLoop.addTimer(_nextID, delay, task);
    }

private:
    uint64_t _nextID;                                                              // 自动增长的连接ID
    uint16_t _port;
    int _timeout;                                                                  // 非活跃连接的统计时间
    bool _inactiveReleaseFlag;                                                     // 是否启动超时销毁连接的标志
    EventLoop _baseLoop;                                                           // 主线程的EventLoop对象,负责监听事件的处理
    Acceptor _acceptor;                                                            // 监听套接字的管理对象
    LoopThreadPool _threadPool;                                                    // 从属EventLoop的线程池
    std::unordered_map<uint64_t, ns_connection::ConnectionSharedPtr> _connections; // 保存管理所有连接对应的shared_ptr对象

    ns_connection::ConnectedCallBack _connectedCallBack;
    ns_connection::MessageCallBack _messageCallBack;
    ns_connection::CloseCallBack _closeCallBack;
    ns_connection::EventCallBack _eventCallBack;
};

本图文内容来源于网友网络收集整理提供,作为学习参考使用,版权属于原作者。
THE END
分享
二维码
< <上一篇
下一篇>>