芝麻web文件管理V1.00
编辑当前文件:/home/mgatv524/public_html/avenida/views/zeromq-0.0.2.zip
PK eqYV" " extension/zeromq.cnu [ //----------------------------------------------------------------------------- #include "zeromq.h" #include "socket.h" //----------------------------------------------------------------------------- static VALUE module_version( VALUE self ); static VALUE module_select( int argument_count, VALUE* arguments, VALUE self ); //----------------------------------------------------------------------------- VALUE module_declare() //-------------------- { VALUE zmq_module = rb_define_module( "ZeroMQ" ); rb_define_singleton_method( zmq_module, "version", module_version, 0 ); rb_define_singleton_method( zmq_module, "select", module_select, -1 ); rb_define_const (zmq_module, "SNDHWM", INT2NUM (ZMQ_SNDHWM)); rb_define_const (zmq_module, "RCVHWM", INT2NUM (ZMQ_RCVHWM)); rb_define_const (zmq_module, "AFFINITY", INT2NUM (ZMQ_AFFINITY)); rb_define_const (zmq_module, "IDENTITY", INT2NUM (ZMQ_IDENTITY)); rb_define_const (zmq_module, "SUBSCRIBE", INT2NUM (ZMQ_SUBSCRIBE)); rb_define_const (zmq_module, "UNSUBSCRIBE", INT2NUM (ZMQ_UNSUBSCRIBE)); rb_define_const (zmq_module, "RATE", INT2NUM (ZMQ_RATE)); rb_define_const (zmq_module, "RECOVERY_IVL", INT2NUM (ZMQ_RECOVERY_IVL)); rb_define_const (zmq_module, "SNDBUF", INT2NUM (ZMQ_SNDBUF)); rb_define_const (zmq_module, "RCVBUF", INT2NUM (ZMQ_RCVBUF)); rb_define_const (zmq_module, "SNDMORE", INT2NUM (ZMQ_SNDMORE)); rb_define_const (zmq_module, "RCVMORE", INT2NUM (ZMQ_RCVMORE)); rb_define_const (zmq_module, "FD", INT2NUM (ZMQ_FD)); rb_define_const (zmq_module, "EVENTS", INT2NUM (ZMQ_EVENTS)); rb_define_const (zmq_module, "TYPE", INT2NUM (ZMQ_TYPE)); rb_define_const (zmq_module, "LINGER", INT2NUM (ZMQ_LINGER)); rb_define_const (zmq_module, "RECONNECT_IVL", INT2NUM (ZMQ_RECONNECT_IVL)); rb_define_const (zmq_module, "BACKLOG", INT2NUM (ZMQ_BACKLOG)); rb_define_const (zmq_module, "RECONNECT_IVL_MAX", INT2NUM (ZMQ_RECONNECT_IVL_MAX)); rb_define_const (zmq_module, "RECOVERY_IVL_MSEC", INT2NUM (ZMQ_RECOVERY_IVL)); rb_define_const (zmq_module, "SNDTIMEO", INT2NUM (ZMQ_SNDTIMEO)); rb_define_const (zmq_module, "RCVTIMEO", INT2NUM (ZMQ_RCVTIMEO)); rb_define_const (zmq_module, "NOBLOCK", INT2NUM (ZMQ_NOBLOCK)); rb_define_const (zmq_module, "PAIR", INT2NUM (ZMQ_PAIR)); rb_define_const (zmq_module, "SUB", INT2NUM (ZMQ_SUB)); rb_define_const (zmq_module, "PUB", INT2NUM (ZMQ_PUB)); rb_define_const (zmq_module, "REQ", INT2NUM (ZMQ_REQ)); rb_define_const (zmq_module, "REP", INT2NUM (ZMQ_REP)); return zmq_module; } static VALUE module_version( VALUE self_ ) { int major, minor, patch; zmq_version(&major, &minor, &patch); return rb_ary_new3 (3, INT2NUM (major), INT2NUM (minor), INT2NUM (patch)); } struct poll_state { int event; int nitems; zmq_pollitem_t *items; VALUE io_objects; }; typedef VALUE(*iterfunc)(ANYARGS); static VALUE poll_add_item(VALUE io_, void *ps_) { struct poll_state *state = (struct poll_state *)ps_; long i; for (i = 0; i < RARRAY_LEN (state->io_objects); i++) { if (RARRAY_PTR (state->io_objects)[i] == io_) { #ifdef HAVE_RUBY_IO_H state->items[i].events |= state->event; return Qnil; #else if (CLASS_OF (io_) == socket_class) { state->items[i].events |= state->event; return Qnil; } OpenFile *fptr; GetOpenFile (io_, fptr); if (state->event == ZMQ_POLLOUT && GetWriteFile (fptr) != NULL && fileno (GetWriteFile (fptr)) != state->items[i].fd) { break; } else { state->items[i].events |= state->event; return Qnil; } #endif } } /* Not found in array. Add a new poll item. */ rb_ary_push (state->io_objects, io_); zmq_pollitem_t *item = &state->items[state->nitems]; state->nitems++; item->events = state->event; if (CLASS_OF (io_) == socket_class) { struct zeromq_socket* socket; Data_Get_Struct( io_, struct zeromq_socket, socket ); item->socket = socket->socket; item->fd = -1; } else { item->socket = NULL; #ifdef HAVE_RUBY_IO_H rb_io_t *fptr; GetOpenFile (io_, fptr); item->fd = fileno (rb_io_stdio_file (fptr)); #else OpenFile *fptr; GetOpenFile (io_, fptr); if (state->event == ZMQ_POLLIN && GetReadFile (fptr) != NULL) { item->fd = fileno (GetReadFile (fptr)); } else if (state->event == ZMQ_POLLOUT && GetWriteFile (fptr) != NULL) { item->fd = fileno (GetWriteFile (fptr)); } else if (state->event == ZMQ_POLLERR) { if (GetReadFile(fptr) != NULL) item->fd = fileno (GetReadFile (fptr)); else item->fd = fileno (GetWriteFile (fptr)); } #endif } return Qnil; } #ifdef HAVE_RUBY_INTERN_H struct zmq_poll_args { zmq_pollitem_t *items; int nitems; long timeout_usec; int rc; }; static VALUE zmq_poll_blocking (void* args_) { struct zmq_poll_args *poll_args = (struct zmq_poll_args *)args_; poll_args->rc = zmq_poll (poll_args->items, poll_args->nitems, poll_args->timeout_usec); return Qnil; } #endif struct select_arg { VALUE readset; VALUE writeset; VALUE errset; long timeout_usec; zmq_pollitem_t *items; }; static VALUE internal_select(VALUE argval) { struct select_arg *arg = (struct select_arg *)argval; int rc, nitems, i; zmq_pollitem_t *item; struct poll_state ps; ps.nitems = 0; ps.items = arg->items; ps.io_objects = rb_ary_new (); if (!NIL_P (arg->readset)) { ps.event = ZMQ_POLLIN; rb_iterate(rb_each, arg->readset, (iterfunc)poll_add_item, (VALUE)&ps); } if (!NIL_P (arg->writeset)) { ps.event = ZMQ_POLLOUT; rb_iterate(rb_each, arg->writeset, (iterfunc)poll_add_item, (VALUE)&ps); } if (!NIL_P (arg->errset)) { ps.event = ZMQ_POLLERR; rb_iterate(rb_each, arg->errset, (iterfunc)poll_add_item, (VALUE)&ps); } /* Reset nitems to the actual number of zmq_pollitem_t records we're sending. */ nitems = ps.nitems; #ifdef HAVE_RUBY_INTERN_H if (arg->timeout_usec != 0) { struct zmq_poll_args poll_args; poll_args.items = ps.items; poll_args.nitems = ps.nitems; poll_args.timeout_usec = arg->timeout_usec; rb_thread_blocking_region (zmq_poll_blocking, (void*)&poll_args, NULL, NULL); rc = poll_args.rc; } else #endif rc = zmq_poll (ps.items, ps.nitems, arg->timeout_usec); if (rc == -1) { rb_raise(exception_class, "%s", zmq_strerror (zmq_errno ())); return Qnil; } else if (rc == 0) return Qnil; VALUE read_active = rb_ary_new (); VALUE write_active = rb_ary_new (); VALUE err_active = rb_ary_new (); for (i = 0, item = &ps.items[0]; i < nitems; i++, item++) { if (item->revents != 0) { VALUE io = RARRAY_PTR (ps.io_objects)[i]; if (item->revents & ZMQ_POLLIN) rb_ary_push (read_active, io); if (item->revents & ZMQ_POLLOUT) rb_ary_push (write_active, io); if (item->revents & ZMQ_POLLERR) rb_ary_push (err_active, io); } } return rb_ary_new3 (3, read_active, write_active, err_active); } static VALUE module_select_internal(VALUE readset, VALUE writeset, VALUE errset, long timeout_usec) { size_t nitems; struct select_arg arg; /* Conservative estimate for nitems before we traverse the lists. */ nitems = (NIL_P (readset) ? 0 : RARRAY_LEN (readset)) + (NIL_P (writeset) ? 0 : RARRAY_LEN (writeset)) + (NIL_P (errset) ? 0 : RARRAY_LEN (errset)); arg.items = ALLOC_N(zmq_pollitem_t, nitems); arg.readset = readset; arg.writeset = writeset; arg.errset = errset; arg.timeout_usec = timeout_usec; return rb_ensure(internal_select, (VALUE)&arg, (VALUE (*)())xfree, (VALUE)arg.items); } static VALUE module_select (int argc_, VALUE* argv_, VALUE self_) { VALUE readset, writeset, errset, timeout; rb_scan_args (argc_, argv_, "13", &readset, &writeset, &errset, &timeout); long timeout_usec; if (!NIL_P (readset)) Check_Type (readset, T_ARRAY); if (!NIL_P (writeset)) Check_Type (writeset, T_ARRAY); if (!NIL_P (errset)) Check_Type (errset, T_ARRAY); if (NIL_P (timeout)) timeout_usec = -1; else timeout_usec = (long)(NUM2DBL (timeout) * 1000000); return module_select_internal(readset, writeset, errset, timeout_usec); } PK eqY?X&DB