前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >(二)Reactor模式

(二)Reactor模式

作者头像
范蠡
发布2018-04-13 15:06:55
1.1K0
发布2018-04-13 15:06:55
举报

最近一直在看游双的《高性能linux服务器编程》一书,下载链接: http://download.csdn.net/detail/analogous_love/9673008

书上是这么介绍Reactor模式的:

按照这个思路,我写个简单的练习:

代码语言:javascript
复制
  1/**   
  2  *@desc:用reactor模式练习服务器程序,main.cpp
  3  *@author: zhangyl
  4  *@date:   2016.11.23
  5  */  
  6  #include <iostream>
  7  #include <string.h>
  8  #include <sys/types.h>
  9  #include <sys/socket.h>
 10  #include <netinet/in.h>
 11  #include <arpa/inet.h>    //for htonl() and htons()
 12  #include <unistd.h>
 13  #include <fcntl.h>
 14  #include <sys/epoll.h>
 15  #include <signal.h>    //for signal()
 16  #include <pthread.h>
 17  #include <semaphore.h>
 18  #include <list>
 19  #include <errno.h>
 20  #include <time.h>
 21  #include <sstream>
 22  #include <iomanip>     //for std::setw()/setfill()
 23  #include <stdlib.h>  
 24
 25  #define WORKER_THREAD_NUM   5  
 26  #define min(a, b) ((a <= b) ? (a) : (b))   
 27  int g_epollfd = 0;
 28  bool g_bStop = false;
 29  int g_listenfd = 0;
 30  pthread_t g_acceptthreadid = 0;
 31  pthread_t g_threadid[WORKER_THREAD_NUM] = { 0 };  
 32  pthread_cond_t g_acceptcond;
 33  pthread_mutex_t g_acceptmutex;  
 34  pthread_cond_t g_cond /*= PTHREAD_COND_INITIALIZER*/;  
 35  pthread_mutex_t g_mutex /*= PTHREAD_MUTEX_INITIALIZER*/;  
 36  pthread_mutex_t g_clientmutex;  
 37  std::list<int> g_listClients;  
 38  void prog_exit(int signo)
 39  {  
 40    ::signal(SIGINT, SIG_IGN);  
 41    ::signal(SIGKILL, SIG_IGN);  
 42    ::signal(SIGTERM, SIG_IGN);  
 43
 44    std::cout << "program recv signal " << signo
 45              << " to exit." << std::endl;  
 46
 47    g_bStop = true;  
 48
 49    ::epoll_ctl(g_epollfd, EPOLL_CTL_DEL, g_listenfd, NULL);  
 50
 51    //TODO: 是否需要先调用shutdown()一下?  
 52    ::shutdown(g_listenfd, SHUT_RDWR);  
 53    ::close(g_listenfd);  
 54    ::close(g_epollfd);  
 55
 56    ::pthread_cond_destroy(&g_acceptcond);  
 57    ::pthread_mutex_destroy(&g_acceptmutex);  
 58
 59    ::pthread_cond_destroy(&g_cond);  
 60    ::pthread_mutex_destroy(&g_mutex);  
 61
 62    ::pthread_mutex_destroy(&g_clientmutex);
 63  }  
 64  bool create_server_listener(const char* ip, short port)
 65  {  
 66    g_listenfd = ::socket(AF_INET, SOCK_STREAM | SOCK_NONBLOCK, 0);  
 67    if (g_listenfd == -1)  
 68        return false;  
 69
 70    int on = 1;  
 71    ::setsockopt(g_listenfd, SOL_SOCKET, SO_REUSEADDR,
 72                 (char *)&on, sizeof(on));  
 73    ::setsockopt(g_listenfd, SOL_SOCKET, SO_REUSEPORT,
 74                 (char *)&on, sizeof(on));  
 75
 76    struct sockaddr_in servaddr;  
 77    memset(&servaddr, 0, sizeof(servaddr));   
 78    servaddr.sin_family = AF_INET;  
 79    servaddr.sin_addr.s_addr = inet_addr(ip);  
 80    servaddr.sin_port = htons(port);  
 81    if (::bind(g_listenfd, (sockaddr *)&servaddr,sizeof(servaddr)) == -1)  
 82        return false;  
 83
 84    if (::listen(g_listenfd, 50) == -1)  
 85        return false;  
 86
 87    g_epollfd = ::epoll_create(1);  
 88    if (g_epollfd == -1)  
 89        return false;  
 90
 91    struct epoll_event e;  
 92    memset(&e, 0, sizeof(e));  
 93    e.events = EPOLLIN | EPOLLRDHUP;  
 94    e.data.fd = g_listenfd;  
 95    if (::epoll_ctl(g_epollfd, EPOLL_CTL_ADD, g_listenfd, &e) == -1)  
 96        return false;  
 97
 98    return true;
 99  }  
100  void release_client(int clientfd)
101  {  
102    if (::epoll_ctl(g_epollfd, EPOLL_CTL_DEL, clientfd, NULL) == -1)  
103        std::cout << "release client socket failed as call epoll_ctl failed"
104                  << std::endl;  
105
106    ::close(clientfd);
107  }  
108  void* accept_thread_func(void* arg)
109  {     
110    while (!g_bStop)  
111    {  
112        ::pthread_mutex_lock(&g_acceptmutex);  
113        ::pthread_cond_wait(&g_acceptcond, &g_acceptmutex);  
114        //::pthread_mutex_lock(&g_acceptmutex);  
115
116        //std::cout << "run loop in accept_thread_func" << std::endl;  
117
118        struct sockaddr_in clientaddr;  
119        socklen_t addrlen;  
120        int newfd = ::accept(g_listenfd,
121                             (struct sockaddr *)&clientaddr, &addrlen);  
122        ::pthread_mutex_unlock(&g_acceptmutex);  
123        if (newfd == -1)  
124            continue;  
125
126        std::cout << "new client connected: "
127                  << ::inet_ntoa(clientaddr.sin_addr) << ":"
128                  << ::ntohs(clientaddr.sin_port) << std::endl;  
129
130        //将新socket设置为non-blocking  
131        int oldflag = ::fcntl(newfd, F_GETFL, 0);  
132        int newflag = oldflag | O_NONBLOCK;  
133        if (::fcntl(newfd, F_SETFL, newflag) == -1)  
134        {  
135            std::cout << "fcntl error, oldflag =" << oldflag
136                      << ", newflag = " << newflag << std::endl;  
137            continue;  
138        }  
139
140        struct epoll_event e;  
141        memset(&e, 0, sizeof(e));  
142        e.events = EPOLLIN | EPOLLRDHUP | EPOLLET;  
143        e.data.fd = newfd;  
144        if (::epoll_ctl(g_epollfd, EPOLL_CTL_ADD, newfd, &e) == -1)  
145        {  
146            std::cout << "epoll_ctl error, fd =" << newfd << std::endl;  
147        }  
148    }  
149
150    return NULL;
151  }  
152
153  void* worker_thread_func(void* arg)
154  {     
155    while (!g_bStop)  
156    {  
157        int clientfd;  
158        ::pthread_mutex_lock(&g_clientmutex);  
159        while (g_listClients.empty())  
160            ::pthread_cond_wait(&g_cond, &g_clientmutex);  
161        clientfd = g_listClients.front();  
162        g_listClients.pop_front();    
163        pthread_mutex_unlock(&g_clientmutex);  
164
165        //gdb调试时不能实时刷新标准输出,用这个函数刷新标准输出,使信息在屏幕上实时显示出来  
166        std::cout << std::endl;  
167
168        std::string strclientmsg;  
169        char buff[256];  
170        bool bError = false;  
171        while (true)  
172        {  
173            memset(buff, 0, sizeof(buff));  
174            int nRecv = ::recv(clientfd, buff, 256, 0);  
175            if (nRecv == -1)  
176            {  
177                if (errno == EWOULDBLOCK)  
178                    break;  
179                else  
180                {  
181                    std::cout << "recv error, client disconnected, fd = "
182                              << clientfd << std::endl;  
183                    release_client(clientfd);  
184                    bError = true;  
185                    break;  
186                }  
187
188            }  
189            //对端关闭了socket,这端也关闭。  
190            else if (nRecv == 0)  
191            {  
192                std::cout << "peer closed, client disconnected, fd = "
193                          << clientfd << std::endl;  
194                release_client(clientfd);  
195                bError = true;  
196                break;  
197            }  
198
199            strclientmsg += buff;  
200        }  
201
202        //出错了,就不要再继续往下执行了  
203        if (bError)  
204            continue;  
205
206        std::cout << "client msg: " << strclientmsg;  
207
208        //将消息加上时间标签后发回  
209        time_t now = time(NULL);  
210        struct tm* nowstr = localtime(&now);  
211        std::ostringstream ostimestr;  
212        ostimestr << "[" << nowstr->tm_year + 1900 << "-"   
213                  << std::setw(2) << std::setfill('0')
214                  << nowstr->tm_mon + 1 << "-"   
215                  << std::setw(2) << std::setfill('0')
216                  << nowstr->tm_mday << " "  
217                  << std::setw(2) << std::setfill('0')
218                  << nowstr->tm_hour << ":"   
219                  << std::setw(2) << std::setfill('0')
220                  << nowstr->tm_min << ":"   
221                  << std::setw(2) << std::setfill('0')
222                  << nowstr->tm_sec << "]server reply: ";  
223
224        strclientmsg.insert(0, ostimestr.str());  
225
226        while (true)  
227        {  
228            int nSent = ::send(clientfd, strclientmsg.c_str(), 
229                               strclientmsg.length(), 0);  
230            if (nSent == -1)  
231            {  
232                if (errno == EWOULDBLOCK)  
233                {  
234                    ::sleep(10);  
235                    continue;  
236                }  
237                else  
238                {  
239                    std::cout << "send error, fd = "
240                              << clientfd << std::endl;  
241                    release_client(clientfd);  
242                    break;  
243                }  
244
245            }            
246
247            std::cout << "send: " << strclientmsg;  
248            strclientmsg.erase(0, nSent);  
249
250            if (strclientmsg.empty())  
251                break;  
252        }  
253    }  
254
255    return NULL;
256  }  
257  void daemon_run()
258  {  
259    int pid;  
260    signal(SIGCHLD, SIG_IGN);  
261    //1)在父进程中,fork返回新创建子进程的进程ID;  
262    //2)在子进程中,fork返回0;  
263    //3)如果出现错误,fork返回一个负值;  
264    pid = fork();  
265    if (pid < 0)  
266    {  
267        std:: cout << "fork error" << std::endl;  
268        exit(-1);  
269    }  
270    //父进程退出,子进程独立运行  
271    else if (pid > 0) {  
272        exit(0);  
273    }  
274    //之前parent和child运行在同一个session里,parent是会话(session)的领头进程,  
275    //parent进程作为会话的领头进程,如果exit结束执行的话,那么子进程会成为孤儿进程,并被init收养。  
276    //执行setsid()之后,child将重新获得一个新的会话(session)id。  
277    //这时parent退出之后,将不会影响到child了。  
278    setsid();  
279    int fd;  
280    fd = open("/dev/null", O_RDWR, 0);  
281    if (fd != -1)  
282    {  
283        dup2(fd, STDIN_FILENO);  
284        dup2(fd, STDOUT_FILENO);  
285        dup2(fd, STDERR_FILENO);  
286    }  
287    if (fd > 2)  
288        close(fd);  
289   }  
290
291  int main(int argc, char* argv[])
292  {    
293    short port = 0;  
294    int ch;  
295    bool bdaemon = false;  
296    while ((ch = getopt(argc, argv, "p:d")) != -1)  
297    {  
298        switch (ch)  
299        {  
300        case 'd':  
301            bdaemon = true;  
302            break;  
303        case 'p':  
304            port = atol(optarg);  
305            break;  
306        }  
307    }  
308
309    if (bdaemon)  
310        daemon_run();  
311
312
313    if (port == 0)  
314        port = 12345;  
315
316    if (!create_server_listener("0.0.0.0", port))  
317    {  
318        std::cout << "Unable to create listen server: ip=0.0.0.0, port="
319                  << port << "." << std::endl;  
320        return -1;  
321    }  
322
323
324    //设置信号处理  
325    signal(SIGCHLD, SIG_DFL);  
326    signal(SIGPIPE, SIG_IGN);  
327    signal(SIGINT, prog_exit);  
328    signal(SIGKILL, prog_exit);  
329    signal(SIGTERM, prog_exit);  
330
331    ::pthread_cond_init(&g_acceptcond, NULL);  
332    ::pthread_mutex_init(&g_acceptmutex, NULL);  
333
334    ::pthread_cond_init(&g_cond, NULL);  
335    ::pthread_mutex_init(&g_mutex, NULL);  
336
337    ::pthread_mutex_init(&g_clientmutex, NULL);  
338
339    ::pthread_create(&g_acceptthreadid, NULL, accept_thread_func, NULL);  
340    //启动工作线程  
341    for (int i = 0; i < WORKER_THREAD_NUM; ++i)  
342    {  
343        ::pthread_create(&g_threadid[i], NULL, worker_thread_func, NULL);  
344    }  
345
346    while (!g_bStop)  
347    {         
348        struct epoll_event ev[1024];  
349        int n = ::epoll_wait(g_epollfd, ev, 1024, 10);  
350        if (n == 0)  
351            continue;  
352        else if (n < 0)  
353        {  
354            std::cout << "epoll_wait error" << std::endl;  
355            continue;  
356        }  
357
358        int m = min(n, 1024);  
359        for (int i = 0; i < m; ++i)  
360        {  
361            //通知接收连接线程接收新连接  
362            if (ev[i].data.fd == g_listenfd)  
363                pthread_cond_signal(&g_acceptcond);  
364            //通知普通工作线程接收数据  
365            else  
366            {                 
367                pthread_mutex_lock(&g_clientmutex);                
368                g_listClients.push_back(ev[i].data.fd);  
369                pthread_mutex_unlock(&g_clientmutex);  
370                pthread_cond_signal(&g_cond);  
371                //std::cout << "signal" << std::endl;  
372            }  
373
374        }  
375
376    }  
377
378    return 0;
379  } 

程序的功能一个简单的echo服务:客户端连接上服务器之后,给服务器发送信息,服务器加上时间戳等信息后返回给客户端。

使用到的知识点有:

  1. 条件变量

2.epoll的边缘触发模式

程序的大致框架是:

  1. 主线程只负责监听侦听socket上是否有新连接,如果有新连接到来,交给一个叫accept的工作线程去接收新连接,并将新连接socket绑定到主线程使用epollfd上去。
  2. 主线程如果侦听到客户端的socket上有可读事件,则通知另外五个工作线程去接收处理客户端发来的数据,并将数据加上时间戳后发回给客户端。
  3. 可以通过传递-p port来设置程序的监听端口号;可以通过传递-d来使程序以daemon模式运行在后台。这也是标准linux daemon模式的书写方法。

程序难点和需要注意的地方是:

  1. 条件变量为了防止虚假唤醒,一定要在一个循环里面调用pthread_cond_wait()函数,我在worker_thread_func()中使用了:
代码语言:javascript
复制
1while (g_listClients.empty())  
2            ::pthread_cond_wait(&g_cond, &g_clientmutex); 

在accept_thread_func()函数里面我没有使用循环,这样会有问题吗?

  1. 使用条件变量pthread_cond_wait()函数的时候一定要先获得与该条件变量相关的mutex,即像下面这样的结构:
代码语言:javascript
复制
1mutex_lock(...);  
2  while (condition is true)  
3    ::pthread_cond_wait(...);  
4  //这里可以有其他代码...  mutex_unlock(...);  
5  //这里可以有其他代码... 

因为pthread_cond_wait()如果阻塞的话,它解锁相关mutex和阻塞当前线程这两个动作加在一起是原子的。

  1. 作为服务器端程序最好对侦听socket调用setsocketopt()设置SO_REUSEADDR和SO_REUSEPORT两个标志,因为服务程序有时候会需要重启(比如调试的时候就会不断重启),如果不设置这两个标志的话,绑定端口时就会调用失败。因为一个端口使用后,即使不再使用,因为四次挥手该端口处于TIME_WAIT状态,有大约2min的MSL(Maximum Segment Lifetime,最大存活期)。这2min内,该端口是不能被重复使用的。你的服务器程序上次使用了这个端口号,接着重启,因为这个缘故,你再次绑定这个端口就会失败(bind函数调用失败)。要不你就每次重启时需要等待2min后再试(这在频繁重启程序调试是难以接收的),或者设置这种SO_REUSEADDR和SO_REUSEPORT立即回收端口使用。

其实,SO_REUSEADDR在windows上和Unix平台上还有些细微的区别,我在libevent源码中看到这样的描述:

代码语言:javascript
复制
 1int evutil_make_listen_socket_reuseable(evutil_socket_t sock)  
 2{  
 3#ifndef WIN32  
 4    int one = 1;  
 5    /* REUSEADDR on Unix means, "don't hang on to this address after the 
 6     * listener is closed."  On Windows, though, it means "don't keep other 
 7     * processes from binding to this address while we're using it.
 8     */  
 9    return setsockopt(sock, SOL_SOCKET, SO_REUSEADDR, (void*) &one,  
10        (ev_socklen_t)sizeof(one));  
11#else  
12    return 0;  
13#endif  
14}  

注意注释部分,在Unix平台上设置这个选项意味着,任意进程可以复用该地址;而在windows,不要阻止其他进程复用该地址。也就是在在Unix平台上,如果不设置这个选项,任意进程在一定时间内,不能bind该地址;在windows平台上,在一定时间内,其他进程不能bind该地址,而本进程却可以再次bind该地址。

  1. epoll_wait对新连接socket使用的是边缘触发模式EPOLLET(edge trigger),而不是默认的水平触发模式(level trigger)。因为如果采取水平触发模式的话,主线程检测到某个客户端socket数据可读时,通知工作线程去收取该socket上的数据,这个时候主线程继续循环,只要在工作线程没有将该socket上数据全部收完,或者在工作线程收取数据的过程中,客户端有新数据到来,主线程会继续发通知(通过pthread_cond_signal())函数,再次通知工作线程收取数据。这样会可能导致多个工作线程同时调用recv函数收取该客户端socket上的数据,这样产生的结果将会导致数据错乱。

相反,采取边缘触发模式,只有等某个工作线程将那个客户端socket上数据全部收取完毕,主线程的epoll_wait才可能会再次触发来通知工作线程继续收取那个客户端socket新来的数据。

  1. 代码中有这样一行:

//gdb调试时不能实时刷新标准输出,用这个函数刷新标准输出,使信息在屏幕上实时显示出来 std::cout << std::endl; 如果不加上这一行,正常运行服务器程序,程序中要打印到控制台的信息都会打印出来,但是如果用gdb调试状态下,程序的所有输出就不显示了。我不知道这是不是gdb的一个bug,所以这里加上std::endl来输出一个换行符并flush标准输出,让输出显示出来。(std::endl不仅是输出一个换行符而且是同时刷新输出,相当于fflush()函数)。

程序我部署起来了,你可以使用linux的nc命令或自己写程序连接服务器来查看程序效果,当然也可以使用telnet命令,方法:

linux:

nc 120.55.94.78 12345

telnet 120.55.94.78 12345

然后就可以给服务器自由发送数据了,服务器会给你发送的信息加上时间戳返回给你。效果如图:

另外我将这个代码改写了成纯C++11版本,使用CMake编译,为了支持编译必须加上这-std=c++11:

CMakeLists.txt代码如下:

代码语言:javascript
复制
 1cmake_minimum_required(VERSION 2.8)  
 2  PROJECT(myreactorserver)  
 3  AUX_SOURCE_DIRECTORY(./ SRC_LIST)
 4  SET(EXECUTABLE_OUTPUT_PATH ./)  
 5  ADD_DEFINITIONS(-g -W -Wall -Wno-deprecated
 6                  -DLINUX -D_REENTRANT -D_FILE_OFFSET_BITS=64
 7                  -DAC_HAS_INFO -DAC_HAS_WARNING -DAC_HAS_ERROR 
 8                  -DAC_HAS_CRITICAL -DTIXML_USE_STL
 9                  -DHAVE_CXX_STDHEADERS ${CMAKE_CXX_FLAGS}
10                  -std=c++11)  
11  INCLUDE_DIRECTORIES(  ./  )
12  LINK_DIRECTORIES(  ./  )  
13  set(  main.cpp  myreator.cpp  )  
14  ADD_EXECUTABLE(myreactorserver ${SRC_LIST})  
15  TARGET_LINK_LIBRARIES(myreactorserver pthread)  

myreactor.h文件内容:

代码语言:javascript
复制
 1/**
 2 *@desc: myreactor头文件, myreactor.h
 3 *@author: zhangyl
 4 *@date: 2016.12.03
 5 */
 6  #ifndef __MYREACTOR_H__
 7  #define __MYREACTOR_H__  
 8  #include <list>
 9  #include <memory>
10  #include <thread>
11  #include <mutex>
12  #include <condition_variable>  
13  #define WORKER_THREAD_NUM   5  
14  class CMyReactor
15  {
16  public:  
17    CMyReactor();  
18    ~CMyReactor();  
19
20    bool init(const char* ip, short nport);  
21    bool uninit();  
22
23    bool close_client(int clientfd);  
24
25    static void* main_loop(void* p);  
26  private:  
27    //no copyable  
28    CMyReactor(const CMyReactor& rhs);  
29    CMyReactor& operator = (const CMyReactor& rhs);  
30
31    bool create_server_listener(const char* ip, short port);  
32
33    static void accept_thread_proc(CMyReactor* pReatcor);  
34    static void worker_thread_proc(CMyReactor* pReatcor);  
35  private:  
36    //C11语法可以在这里初始化  
37    int                          m_listenfd = 0;  
38    int                          m_epollfd  = 0;  
39    bool                         m_bStop    = false;  
40
41    std::shared_ptr<std::thread> m_acceptthread;  
42    std::shared_ptr<std::thread> m_workerthreads[WORKER_THREAD_NUM];  
43
44    std::condition_variable      m_acceptcond;  
45    std::mutex                   m_acceptmutex;  
46
47    std::condition_variable      m_workercond ;  
48    std::mutex                   m_workermutex;  
49
50    std::list<int>                 m_listClients;
51  };  
52  #endif //!__MYREACTOR_H__ 

myreactor.cpp文件内容:

代码语言:javascript
复制
  1 /**
  2  *@desc: myreactor实现文件, myreactor.cpp
  3  *@author: zhangyl
  4  *@date: 2016.12.03
  5  */  #include "myreactor.h"
  6  #include <iostream>
  7  #include <string.h>
  8  #include <sys/types.h>
  9  #include <sys/socket.h>
 10  #include <netinet/in.h>
 11  #include <arpa/inet.h>  //for htonl() and htons()
 12  #include <fcntl.h>
 13  #include <sys/epoll.h>
 14  #include <list>
 15  #include <errno.h>
 16  #include <time.h>
 17  #include <sstream>
 18  #include <iomanip>   //for std::setw()/setfill()
 19  #include <unistd.h>  
 20  #define min(a, b) ((a <= b) ? (a) : (b))  
 21  CMyReactor::CMyReactor()
 22  {  
 23    //m_listenfd = 0;  
 24    //m_epollfd = 0;  
 25    //m_bStop = false;
 26  }  
 27  CMyReactor::~CMyReactor()
 28  {  
 29  }  
 30  bool CMyReactor::init(const char* ip, short nport)
 31  {  
 32    if (!create_server_listener(ip, nport))  
 33    {  
 34        std::cout << "Unable to bind: " << ip
 35                  << ":" << nport << "." << std::endl;  
 36        return false;  
 37    }  
 38
 39
 40    std::cout << "main thread id = " << std::this_thread::get_id()
 41              << std::endl;  
 42
 43    //启动接收新连接的线程  
 44    m_acceptthread.reset(new std::thread(CMyReactor::accept_thread_proc, this));  
 45
 46    //启动工作线程  
 47    for (auto& t : m_workerthreads)  
 48    {  
 49        t.reset(new std::thread(CMyReactor::worker_thread_proc, this));  
 50    }  
 51
 52
 53    return true;
 54  }  
 55  bool CMyReactor::uninit()
 56  {  
 57    m_bStop = true;  
 58    m_acceptcond.notify_one();  
 59    m_workercond.notify_all();  
 60
 61    m_acceptthread->join();  
 62    for (auto& t : m_workerthreads)  
 63    {  
 64        t->join();  
 65    }  
 66
 67    ::epoll_ctl(m_epollfd, EPOLL_CTL_DEL, m_listenfd, NULL);  
 68
 69    //TODO: 是否需要先调用shutdown()一下?  
 70    ::shutdown(m_listenfd, SHUT_RDWR);  
 71    ::close(m_listenfd);  
 72    ::close(m_epollfd);  
 73
 74    return true;
 75  }  
 76  bool CMyReactor::close_client(int clientfd)
 77  {  
 78    if (::epoll_ctl(m_epollfd, EPOLL_CTL_DEL, clientfd, NULL) == -1)  
 79    {  
 80        std::cout << "close client socket failed as call epoll_ctl failed"
 81                  << std::endl;  
 82        //return false;  
 83    }  
 84
 85
 86    ::close(clientfd);  
 87
 88    return true;
 89  }  
 90
 91  void* CMyReactor::main_loop(void* p)
 92  {  
 93    std::cout << "main thread id = "
 94              << std::this_thread::get_id() << std::endl;  
 95
 96    CMyReactor* pReatcor = static_cast<CMyReactor*>(p);  
 97
 98    while (!pReatcor->m_bStop)  
 99    {  
100        struct epoll_event ev[1024];  
101        int n = ::epoll_wait(pReatcor->m_epollfd, ev, 1024, 10);  
102        if (n == 0)  
103            continue;  
104        else if (n < 0)  
105        {  
106            std::cout << "epoll_wait error" << std::endl;  
107            continue;  
108        }  
109
110        int m = min(n, 1024);  
111        for (int i = 0; i < m; ++i)  
112        {  
113            //通知接收连接线程接收新连接  
114            if (ev[i].data.fd == pReatcor->m_listenfd)  
115                pReatcor->m_acceptcond.notify_one();  
116            //通知普通工作线程接收数据  
117            else  
118            {  
119                {  
120                    std::unique_lock<std::mutex> guard(pReatcor->m_workermutex);  
121                    pReatcor->m_listClients.push_back(ev[i].data.fd);  
122                }  
123
124                pReatcor->m_workercond.notify_one();  
125                //std::cout << "signal" << std::endl;  
126            }// end if  
127
128        }// end for-loop  
129    }// end while  
130
131    std::cout << "main loop exit ..." << std::endl;  
132
133    return NULL;
134  }  
135  void CMyReactor::accept_thread_proc(CMyReactor* pReatcor)
136  {  
137    std::cout << "accept thread, thread id = "
138              << std::this_thread::get_id() << std::endl;  
139
140    while (true)  
141    {  
142        int newfd;  
143        struct sockaddr_in clientaddr;  
144        socklen_t addrlen;  
145        {  
146            std::unique_lock<std::mutex> guard(pReatcor->m_acceptmutex);  
147            pReatcor->m_acceptcond.wait(guard);  
148            if (pReatcor->m_bStop)  
149                break;  
150
151            //std::cout << "run loop in accept_thread_proc" << std::endl;  
152
153            newfd = ::accept(pReatcor->m_listenfd,
154                              (struct sockaddr *)&clientaddr, &addrlen);  
155        }  
156        if (newfd == -1)  
157            continue;  
158
159        std::cout << "new client connected: "
160                  << ::inet_ntoa(clientaddr.sin_addr) << ":"      
161                  << ::ntohs(clientaddr.sin_port) << std::endl;  
162
163        //将新socket设置为non-blocking  
164        int oldflag = ::fcntl(newfd, F_GETFL, 0);  
165        int newflag = oldflag | O_NONBLOCK;  
166        if (::fcntl(newfd, F_SETFL, newflag) == -1)  
167        {  
168            std::cout << "fcntl error, oldflag =" << oldflag
169                      << ", newflag = " << newflag << std::endl;  
170            continue;  
171        }  
172
173        struct epoll_event e;  
174        memset(&e, 0, sizeof(e));  
175        e.events = EPOLLIN | EPOLLRDHUP | EPOLLET;  
176        e.data.fd = newfd;  
177        if (::epoll_ctl(pReatcor->m_epollfd, 
178            EPOLL_CTL_ADD, newfd, &e) == -1)  
179        {  
180            std::cout << "epoll_ctl error, fd =" << newfd << std::endl;  
181        }  
182    }  
183
184    std::cout << "accept thread exit ..." << std::endl;
185  }  
186  void CMyReactor::worker_thread_proc(CMyReactor* pReatcor)
187  {  
188    std::cout << "new worker thread, thread id = "
189              << std::this_thread::get_id() << std::endl;  
190
191    while (true)  
192    {  
193        int clientfd;  
194        {  
195            std::unique_lock<std::mutex> guard(pReatcor->m_workermutex);  
196            while (pReatcor->m_listClients.empty())  
197            {  
198                if (pReatcor->m_bStop)  
199                {  
200                    std::cout << "worker thread exit ..." << std::endl;  
201                    return;  
202                }  
203
204                pReatcor->m_workercond.wait(guard);  
205            }  
206
207            clientfd = pReatcor->m_listClients.front();  
208            pReatcor->m_listClients.pop_front();  
209        }  
210
211        //gdb调试时不能实时刷新标准输出,用这个函数刷新标准输出,使信息在屏幕上实时显示出来  
212        std::cout << std::endl;  
213
214        std::string strclientmsg;  
215        char buff[256];  
216        bool bError = false;  
217        while (true)  
218        {  
219            memset(buff, 0, sizeof(buff));  
220            int nRecv = ::recv(clientfd, buff, 256, 0);  
221            if (nRecv == -1)  
222            {  
223                if (errno == EWOULDBLOCK)  
224                    break;  
225                else  
226                {  
227                    std::cout << "recv error, client disconnected, fd = "
228                              << clientfd << std::endl;  
229                    pReatcor->close_client(clientfd);  
230                    bError = true;  
231                    break;  
232                }  
233
234            }  
235            //对端关闭了socket,这端也关闭。  
236            else if (nRecv == 0)  
237            {  
238                std::cout << "peer closed, client disconnected, fd = "
239                          << clientfd << std::endl;  
240                pReatcor->close_client(clientfd);  
241                bError = true;  
242                break;  
243            }  
244
245            strclientmsg += buff;  
246        }  
247
248        //出错了,就不要再继续往下执行了  
249        if (bError)  
250            continue;  
251
252        std::cout << "client msg: " << strclientmsg;  
253
254        //将消息加上时间标签后发回  
255        time_t now = time(NULL);  
256        struct tm* nowstr = localtime(&now);  
257        std::ostringstream ostimestr;  
258        ostimestr << "[" << nowstr->tm_year + 1900 << "-"  
259            << std::setw(2) << std::setfill('0') << nowstr->tm_mon + 1 << "-"  
260            << std::setw(2) << std::setfill('0') << nowstr->tm_mday << " "  
261            << std::setw(2) << std::setfill('0') << nowstr->tm_hour << ":"  
262            << std::setw(2) << std::setfill('0') << nowstr->tm_min << ":"  
263            << std::setw(2) << std::setfill('0') << nowstr->tm_sec << "]server reply: ";  
264
265        strclientmsg.insert(0, ostimestr.str());  
266
267        while (true)  
268        {  
269            int nSent = ::send(clientfd, strclientmsg.c_str(), 
270                               strclientmsg.length(), 0);  
271            if (nSent == -1)  
272            {  
273                if (errno == EWOULDBLOCK)  
274                {  
275                    std::this_thread::sleep_for(std::chrono::milliseconds(10));  
276                    continue;  
277                }  
278                else  
279                {  
280                    std::cout << "send error, fd = "
281                              << clientfd << std::endl;  
282                    pReatcor->close_client(clientfd);  
283                    break;  
284                }  
285
286            }  
287
288            std::cout << "send: " << strclientmsg;  
289            strclientmsg.erase(0, nSent);  
290
291            if (strclientmsg.empty())  
292                break;  
293        }  
294    }
295  }  
296  bool CMyReactor::create_server_listener(const char* ip, short port)
297  {  
298    m_listenfd = ::socket(AF_INET, SOCK_STREAM | SOCK_NONBLOCK, 0);  
299    if (m_listenfd == -1)  
300        return false;  
301
302    int on = 1;  
303    ::setsockopt(m_listenfd, SOL_SOCKET, SO_REUSEADDR,
304                (char *)&on, sizeof(on));  
305    ::setsockopt(m_listenfd, SOL_SOCKET, SO_REUSEPORT,
306                (char *)&on, sizeof(on));  
307
308    struct sockaddr_in servaddr;  
309    memset(&servaddr, 0, sizeof(servaddr));  
310    servaddr.sin_family = AF_INET;  
311    servaddr.sin_addr.s_addr = inet_addr(ip);  
312    servaddr.sin_port = htons(port);  
313    if (::bind(m_listenfd, (sockaddr *)&servaddr, 
314         sizeof(servaddr)) == -1)  
315        return false;  
316
317    if (::listen(m_listenfd, 50) == -1)  
318        return false;  
319
320    m_epollfd = ::epoll_create(1);  
321    if (m_epollfd == -1)  
322        return false;  
323
324    struct epoll_event e;  
325    memset(&e, 0, sizeof(e));  
326    e.events = EPOLLIN | EPOLLRDHUP;  
327    e.data.fd = m_listenfd;  
328    if (::epoll_ctl(m_epollfd, EPOLL_CTL_ADD, m_listenfd, &e) == -1)  
329        return false;  
330
331    return true;
332  }  

main.cpp文件内容:

代码语言:javascript
复制
  1/**  
  2 *@desc:   用reactor模式练习服务器程序 
  3 *@author: zhangyl 
  4 *@date:   2016.12.03 
  5 */  
  6
  7#include <iostream>  
  8#include <signal.h>     //for signal()  
  9#include<unistd.h>  
 10#include <stdlib.h>       //for exit()  
 11#include <sys/types.h>  
 12#include <sys/stat.h>  
 13#include <fcntl.h>  
 14#include "myreactor.h"  
 15
 16CMyReactor g_reator;  
 17
 18void prog_exit(int signo)  
 19{  
 20    std::cout << "program recv signal " << signo
 21              << " to exit." << std::endl;   
 22
 23    g_reator.uninit();  
 24}  
 25
 26void daemon_run()  
 27{  
 28    int pid;  
 29    signal(SIGCHLD, SIG_IGN);  
 30    //1)在父进程中,fork返回新创建子进程的进程ID;  
 31    //2)在子进程中,fork返回0;  
 32    //3)如果出现错误,fork返回一个负值;  
 33    pid = fork();  
 34    if (pid < 0)  
 35    {  
 36        std:: cout << "fork error" << std::endl;  
 37        exit(-1);  
 38    }  
 39    //父进程退出,子进程独立运行  
 40    else if (pid > 0)
 41   {  
 42        exit(0);  
 43    }  
 44    //之前parent和child运行在同一个session里,parent是会话(session)的领头进程,  
 45    //parent进程作为会话的领头进程,如果exit结束执行的话,那么子进程会成为孤儿进程,并被init收养。  
 46    //执行setsid()之后,child将重新获得一个新的会话(session)id。  
 47    //这时parent退出之后,将不会影响到child了。  
 48    setsid();  
 49    int fd;  
 50    fd = open("/dev/null", O_RDWR, 0);  
 51    if (fd != -1)  
 52    {  
 53        dup2(fd, STDIN_FILENO);  
 54        dup2(fd, STDOUT_FILENO);  
 55        dup2(fd, STDERR_FILENO);  
 56    }  
 57    if (fd > 2)  
 58        close(fd);  
 59}  
 60
 61
 62int main(int argc, char* argv[])  
 63{    
 64    //设置信号处理  
 65    signal(SIGCHLD, SIG_DFL);  
 66    signal(SIGPIPE, SIG_IGN);  
 67    signal(SIGINT, prog_exit);  
 68    signal(SIGKILL, prog_exit);  
 69    signal(SIGTERM, prog_exit);  
 70
 71    short port = 0;  
 72    int ch;  
 73    bool bdaemon = false;  
 74    while ((ch = getopt(argc, argv, "p:d")) != -1)  
 75    {  
 76        switch (ch)  
 77        {  
 78        case 'd':  
 79            bdaemon = true;  
 80            break;  
 81        case 'p':  
 82            port = atol(optarg);  
 83            break;  
 84        }  
 85    }  
 86
 87    if (bdaemon)  
 88        daemon_run();  
 89
 90
 91    if (port == 0)  
 92        port = 12345;  
 93
 94
 95    if (!g_reator.init("0.0.0.0", 12345))  
 96        return -1;  
 97
 98    g_reator.main_loop(&g_reator);  
 99
100    return 0;  
101}  

完整实例代码下载地址:

普通版本:https://pan.baidu.com/s/1o82Mkno

C++11版本:https://pan.baidu.com/s/1dEJdrih

本文参与?腾讯云自媒体同步曝光计划,分享自微信公众号。
原始发表:2018-04-07,如有侵权请联系?cloudcommunity@tencent.com 删除

本文分享自 高性能服务器开发 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与?腾讯云自媒体同步曝光计划? ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档
http://www.vxiaotou.com