00001 #include "socketeventshandler.h"
00002
00003 #include "socketevents.h"
00004
00005 namespace n2nc {
00006 namespace net {
00007
00008 SocketEventsHandler::SocketEventsHandler(){
00009 ::pipe(this->m_wakeup);
00010 ::fcntl(this->m_wakeup[0],F_SETFL,O_NONBLOCK);
00011 ::fcntl(this->m_wakeup[1],F_SETFL,O_NONBLOCK);
00012
00013
00014 }
00015 SocketEventsHandler::~SocketEventsHandler(){
00016 ::close(this->m_wakeup[0]);
00017 ::close(this->m_wakeup[1]);
00018
00019 }
00020
00021
00022 int SocketEventsHandler::add(Socket &sock,check_for_t checkfor, SocketEvents& se){
00023
00024
00025
00026
00027 socket_extra_t *sockinfo = new socket_extra_t();
00028 ::bzero(sockinfo,sizeof(socket_extra_t));
00029 sockinfo->check = checkfor ;
00030 sockinfo->sock = &sock ;
00031 sockinfo->se = &se ;
00032 sockinfo->wait = NULL ;
00033
00034 this->m_lock.lock();
00035
00036
00037 this->m_map_new.insert( std::pair<Socket::fd_t,socket_extra_t*>(sock.getFD(),sockinfo) );
00038 this->m_lock.unlock();
00039 char t_dummy[4] ;
00040 ::write(this->m_wakeup[0],t_dummy,4);
00041 }
00042
00043 int SocketEventsHandler::del(Socket &sock){
00044
00045 if(pthread_equal(pthread_self(),this->m_thread)){
00046 this->m_map_new.find(sock.getFD())->second = NULL ;
00047
00048 }else{
00049 this->m_lock.lock();
00050 this->m_map_new.erase(sock.getFD());
00051 this->m_lock.unlock();
00052 }
00053
00054 char t_dummy[4] ;
00055 ::write(this->m_wakeup[0],t_dummy,4);
00056 }
00057
00058
00059 void* SocketEventsHandler::entry_point(){
00060 this->setName("SKEV");
00061 this->_main_loop();
00062 return NULL ;
00063 }
00064
00065 int SocketEventsHandler::waitForEvents(){
00066
00067 this->run(NULL);
00068
00069 }
00070
00071 int SocketEventsHandler::_main_loop(){
00072 int maxfd ,retval ;
00073 char t_dummy[1024];
00074 fd_set read_fds, write_fds, except_fds ;
00075 Socket *soc ;
00076 std::map<Socket::fd_t,socket_extra_t*>::iterator it ;
00077
00078 while(true){
00079 reinithack:
00080
00081
00082 FD_ZERO(&read_fds);
00083 FD_ZERO(&write_fds);
00084 FD_ZERO(&except_fds);
00085 this->m_max_fd = 0 ;
00086
00087
00088 this->m_lock.lock();
00089 it = this->m_map_new.begin() ;
00090 while(it != this->m_map_new.end()){
00091 if(! it->second){std::cerr << "come aspettavo" << std::endl; this->m_map_new.erase(it);this->m_lock.unlock();goto reinithack; }
00092
00093 if(it->second->check & SocketEventsHandler::NOTHING ){
00094 it++;
00095
00096 continue ;
00097 }
00098
00099 this->tryAddMaxFD(it->first);
00100 if(it->second->check & SocketEventsHandler::READ ) FD_SET(it->first,&read_fds);
00101 if(it->second->check & SocketEventsHandler::WRITE ) FD_SET(it->first,&write_fds);
00102 if(it->second->check & SocketEventsHandler::EXCEPT ) FD_SET(it->first,&except_fds);
00103
00104 it++;
00105 }
00106 this->m_lock.unlock();
00107
00108 FD_SET(this->m_wakeup[0],&read_fds);
00109 this->m_max_fd = std::max(this->m_max_fd, this->m_wakeup[0]);
00110
00111
00112
00113 retval = ::select(this->m_max_fd+1, &read_fds, &write_fds, &except_fds, NULL);
00114 if(retval == -1) std::cerr << "select error" << std::endl ;
00115
00116 int a;
00117 this->m_lock.lock();
00118
00119 it = this->m_map_new.begin() ;
00120 while(it != this->m_map_new.end()){
00121 if(! it->second){std::cerr << "come aspettavo" << std::endl; this->m_map_new.erase(it);this->m_lock.unlock();goto reinithack; }
00122 a = (READ * FD_ISSET(it->first,&read_fds)) + (WRITE * FD_ISSET(it->first,&write_fds)) + (EXCEPT * FD_ISSET(it->first,&except_fds) ) ;
00123 if( a ){
00124 if (it->second->wait) it->second->wait->signal();
00125 if (it->second->se) it->second->se->onSelectEvent(it->second,(check_for_t)a );
00126 }
00127 it++;
00128 }
00129 this->m_lock.unlock();
00130
00131 ::read(this->m_wakeup[0],t_dummy,1024);
00132
00133 }
00134 }
00135
00136 int SocketEventsHandler::computeMaxFD(){
00137
00138 std::map<Socket::fd_t,socket_extra_t*>::iterator it ;
00139 this->m_max_fd = 0 ;
00140 this->tryAddMaxFD( this->m_wakeup[0] );
00141 this->tryAddMaxFD( this->m_wakeup[1] );
00142 it = this->m_map_new.begin() ;
00143 while(it != this->m_map_new.end()){
00144 this->tryAddMaxFD(it->first);
00145 it++;
00146 }
00147 return this->m_max_fd ;
00148 }
00149
00150 int SocketEventsHandler::tryAddMaxFD(Socket::fd_t fd){
00151 return this->m_max_fd = std::max(this->m_max_fd, fd);
00152 }
00153
00154 int SocketEventsHandler::tryAddMaxFD(const std::pair<Socket::fd_t,socket_extra_t*>& mypair){
00155 return this->m_max_fd = std::max(this->m_max_fd, mypair.first );
00156 }
00157
00158
00159 bool SocketEventsHandler::enableSocket(const Socket& sock){
00160 std::map<Socket::fd_t,socket_extra_t*>::iterator tmp_res ;
00161 this->m_lock.lock();
00162 tmp_res = this->m_map_new.find(sock.getFD());
00163 if (tmp_res == this->m_map_new.end() ) {
00164
00165 this->m_lock.unlock();
00166 return false ;
00167 }
00168 tmp_res->second->check = (check_for_t)(tmp_res->second->check & ~NOTHING) ;
00169 this->m_lock.unlock();
00170 return true ;
00171 }
00172
00173 bool SocketEventsHandler::disableSocket(const Socket& sock){
00174 std::map<Socket::fd_t,socket_extra_t*>::iterator tmp_res ;
00175 this->m_lock.lock();
00176 tmp_res = this->m_map_new.find(sock.getFD());
00177 if (tmp_res == this->m_map_new.end() ) {
00178
00179 this->m_lock.unlock();
00180 return false ;
00181 }
00182 tmp_res->second->check = (check_for_t)(tmp_res->second->check | NOTHING) ;
00183 this->m_lock.unlock();
00184 return true ;
00185 }
00186
00187
00188
00189 }
00190 }
00191
00192
00193
00194
00195
00196