ThreadPool采用message driven方式,每当select返回一个fd的read请求时,将其包装为Task,丢入ThreadPool的message queue中。等其中的Thread从queue拿出,进行处理。
有三点心得:
1)当read请求已经被包装丢入队列,但还未得到recv处理时,select依然会不停的对这个fd的read进行响应,因此当丢入队列尚未处理时,select应拒绝该fd的read,否则负载越高,越容易重复Task溢出。解决方法是再设一个fd_pending的set,记录此状态下的fd。
2)queue不应是无限长的,在达到一定长度后应拒绝入队请求,以控制负载。同时对队列操作部分要设Critical Section。
3)fd_set的系列操作都是线程不安全的,在多个线程同时操作其时,需要设Critical Section。因此要将fd_set系列操作重新包装,外面加上Enter/Leave Critical Section。
Tuesday, August 29, 2006
Monday, August 21, 2006
似乎终于有点看懂select了
从只会用WSAxxxx到看懂select,还真是一个飞跃,赶快记下来。
select一般用于单线程、并发服务器
SOCKET msock;
fd_set rfds, afds;
msock = passiveTCP( service, QLEN );
FD_ZERO( &afds );
FD_SET( msock, &afds );
while( 1 )
{
memcpy( &rfds, &afds, sizeof( rfds ) );
if( select( FD_SETSIZE, &rfds, (fd_set*)0, (fd_set*)0, (struct timeval *)0 ) == SOCKET_ERROR ) )
errexit;
if( FD_ISSET( msock, &rfds ))
{
SOCKET ssock;
alen = sizeof( fsin );
ssock = accept( msock, (struct sockaddr *)&fsin,&alen );
if( ssock == INVALID_SOCKET )
errexit;
FD_SET( ssock, &afds );
}
for( fdndx=0; fdndx <rfds.fd_count; ++ fdndx )
{
SOCKET fd=rfds.fd_array[fdndx];
if( fd != msock && FD_ISSET( fd, &rfds ))
if( echo(fd) == 0 ) // DO Echo Process
{
closesocket( fd );
FD_CLR( fd, &afds );
}
}
}
Thursday, August 10, 2006
Subscribe to:
Posts (Atom)