]> Git Repo - VerusCoin.git/blob - src/httpserver.cpp
http: Wait for worker threads to exit
[VerusCoin.git] / src / httpserver.cpp
1 // Copyright (c) 2015 The Bitcoin Core developers
2 // Distributed under the MIT software license, see the accompanying
3 // file COPYING or http://www.opensource.org/licenses/mit-license.php.
4
5 #include "httpserver.h"
6
7 #include "chainparamsbase.h"
8 #include "compat.h"
9 #include "util.h"
10 #include "netbase.h"
11 #include "rpcprotocol.h" // For HTTP status codes
12 #include "sync.h"
13 #include "ui_interface.h"
14
15 #include <stdio.h>
16 #include <stdlib.h>
17 #include <string.h>
18
19 #include <sys/types.h>
20 #include <sys/stat.h>
21 #include <signal.h>
22
23 #include <event2/event.h>
24 #include <event2/http.h>
25 #include <event2/thread.h>
26 #include <event2/buffer.h>
27 #include <event2/util.h>
28 #include <event2/keyvalq_struct.h>
29
30 #ifdef EVENT__HAVE_NETINET_IN_H
31 #include <netinet/in.h>
32 #ifdef _XOPEN_SOURCE_EXTENDED
33 #include <arpa/inet.h>
34 #endif
35 #endif
36
37 #include <boost/algorithm/string/case_conv.hpp> // for to_lower()
38 #include <boost/foreach.hpp>
39 #include <boost/scoped_ptr.hpp>
40
41 /** HTTP request work item */
42 class HTTPWorkItem : public HTTPClosure
43 {
44 public:
45     HTTPWorkItem(HTTPRequest* req, const std::string &path, const HTTPRequestHandler& func):
46         req(req), path(path), func(func)
47     {
48     }
49     void operator()()
50     {
51         func(req.get(), path);
52     }
53
54     boost::scoped_ptr<HTTPRequest> req;
55
56 private:
57     std::string path;
58     HTTPRequestHandler func;
59 };
60
61 /** Simple work queue for distributing work over multiple threads.
62  * Work items are simply callable objects.
63  */
64 template <typename WorkItem>
65 class WorkQueue
66 {
67 private:
68     /** Mutex protects entire object */
69     CWaitableCriticalSection cs;
70     CConditionVariable cond;
71     /* XXX in C++11 we can use std::unique_ptr here and avoid manual cleanup */
72     std::deque<WorkItem*> queue;
73     bool running;
74     size_t maxDepth;
75     int numThreads;
76
77     /** RAII object to keep track of number of running worker threads */
78     class ThreadCounter
79     {
80     public:
81         WorkQueue &wq;
82         ThreadCounter(WorkQueue &w): wq(w)
83         {
84             boost::lock_guard<boost::mutex> lock(wq.cs);
85             wq.numThreads += 1;
86         }
87         ~ThreadCounter()
88         {
89             boost::lock_guard<boost::mutex> lock(wq.cs);
90             wq.numThreads -= 1;
91             wq.cond.notify_all();
92         }
93     };
94
95 public:
96     WorkQueue(size_t maxDepth) : running(true),
97                                  maxDepth(maxDepth),
98                                  numThreads(0)
99     {
100     }
101     /*( Precondition: worker threads have all stopped
102      * (call WaitExit)
103      */
104     ~WorkQueue()
105     {
106         while (!queue.empty()) {
107             delete queue.front();
108             queue.pop_front();
109         }
110     }
111     /** Enqueue a work item */
112     bool Enqueue(WorkItem* item)
113     {
114         boost::unique_lock<boost::mutex> lock(cs);
115         if (queue.size() >= maxDepth) {
116             return false;
117         }
118         queue.push_back(item);
119         cond.notify_one();
120         return true;
121     }
122     /** Thread function */
123     void Run()
124     {
125         ThreadCounter count(*this);
126         while (running) {
127             WorkItem* i = 0;
128             {
129                 boost::unique_lock<boost::mutex> lock(cs);
130                 while (running && queue.empty())
131                     cond.wait(lock);
132                 if (!running)
133                     break;
134                 i = queue.front();
135                 queue.pop_front();
136             }
137             (*i)();
138             delete i;
139         }
140     }
141     /** Interrupt and exit loops */
142     void Interrupt()
143     {
144         boost::unique_lock<boost::mutex> lock(cs);
145         running = false;
146         cond.notify_all();
147     }
148     /** Wait for worker threads to exit */
149     void WaitExit()
150     {
151         boost::unique_lock<boost::mutex> lock(cs);
152         while (numThreads > 0)
153             cond.wait(lock);
154     }
155
156     /** Return current depth of queue */
157     size_t Depth()
158     {
159         boost::unique_lock<boost::mutex> lock(cs);
160         return queue.size();
161     }
162 };
163
164 struct HTTPPathHandler
165 {
166     HTTPPathHandler() {}
167     HTTPPathHandler(std::string prefix, bool exactMatch, HTTPRequestHandler handler):
168         prefix(prefix), exactMatch(exactMatch), handler(handler)
169     {
170     }
171     std::string prefix;
172     bool exactMatch;
173     HTTPRequestHandler handler;
174 };
175
176 /** HTTP module state */
177
178 //! libevent event loop
179 static struct event_base* eventBase = 0;
180 //! HTTP server
181 struct evhttp* eventHTTP = 0;
182 //! List of subnets to allow RPC connections from
183 static std::vector<CSubNet> rpc_allow_subnets;
184 //! Work queue for handling longer requests off the event loop thread
185 static WorkQueue<HTTPClosure>* workQueue = 0;
186 //! Handlers for (sub)paths
187 std::vector<HTTPPathHandler> pathHandlers;
188 //! Bound listening sockets
189 std::vector<evhttp_bound_socket *> boundSockets;
190
191 /** Check if a network address is allowed to access the HTTP server */
192 static bool ClientAllowed(const CNetAddr& netaddr)
193 {
194     if (!netaddr.IsValid())
195         return false;
196     BOOST_FOREACH (const CSubNet& subnet, rpc_allow_subnets)
197         if (subnet.Match(netaddr))
198             return true;
199     return false;
200 }
201
202 /** Initialize ACL list for HTTP server */
203 static bool InitHTTPAllowList()
204 {
205     rpc_allow_subnets.clear();
206     rpc_allow_subnets.push_back(CSubNet("127.0.0.0/8")); // always allow IPv4 local subnet
207     rpc_allow_subnets.push_back(CSubNet("::1"));         // always allow IPv6 localhost
208     if (mapMultiArgs.count("-rpcallowip")) {
209         const std::vector<std::string>& vAllow = mapMultiArgs["-rpcallowip"];
210         BOOST_FOREACH (std::string strAllow, vAllow) {
211             CSubNet subnet(strAllow);
212             if (!subnet.IsValid()) {
213                 uiInterface.ThreadSafeMessageBox(
214                     strprintf("Invalid -rpcallowip subnet specification: %s. Valid are a single IP (e.g. 1.2.3.4), a network/netmask (e.g. 1.2.3.4/255.255.255.0) or a network/CIDR (e.g. 1.2.3.4/24).", strAllow),
215                     "", CClientUIInterface::MSG_ERROR);
216                 return false;
217             }
218             rpc_allow_subnets.push_back(subnet);
219         }
220     }
221     std::string strAllowed;
222     BOOST_FOREACH (const CSubNet& subnet, rpc_allow_subnets)
223         strAllowed += subnet.ToString() + " ";
224     LogPrint("http", "Allowing HTTP connections from: %s\n", strAllowed);
225     return true;
226 }
227
228 /** HTTP request method as string - use for logging only */
229 static std::string RequestMethodString(HTTPRequest::RequestMethod m)
230 {
231     switch (m) {
232     case HTTPRequest::GET:
233         return "GET";
234         break;
235     case HTTPRequest::POST:
236         return "POST";
237         break;
238     case HTTPRequest::HEAD:
239         return "HEAD";
240         break;
241     case HTTPRequest::PUT:
242         return "PUT";
243         break;
244     default:
245         return "unknown";
246     }
247 }
248
249 /** HTTP request callback */
250 static void http_request_cb(struct evhttp_request* req, void* arg)
251 {
252     std::unique_ptr<HTTPRequest> hreq(new HTTPRequest(req));
253
254     LogPrint("http", "Received a %s request for %s from %s\n",
255              RequestMethodString(hreq->GetRequestMethod()), hreq->GetURI(), hreq->GetPeer().ToString());
256
257     // Early address-based allow check
258     if (!ClientAllowed(hreq->GetPeer())) {
259         hreq->WriteReply(HTTP_FORBIDDEN);
260         return;
261     }
262
263     // Early reject unknown HTTP methods
264     if (hreq->GetRequestMethod() == HTTPRequest::UNKNOWN) {
265         hreq->WriteReply(HTTP_BADMETHOD);
266         return;
267     }
268
269     // Find registered handler for prefix
270     std::string strURI = hreq->GetURI();
271     std::string path;
272     std::vector<HTTPPathHandler>::const_iterator i = pathHandlers.begin();
273     std::vector<HTTPPathHandler>::const_iterator iend = pathHandlers.end();
274     for (; i != iend; ++i) {
275         bool match = false;
276         if (i->exactMatch)
277             match = (strURI == i->prefix);
278         else
279             match = (strURI.substr(0, i->prefix.size()) == i->prefix);
280         if (match) {
281             path = strURI.substr(i->prefix.size());
282             break;
283         }
284     }
285
286     // Dispatch to worker thread
287     if (i != iend) {
288         std::unique_ptr<HTTPWorkItem> item(new HTTPWorkItem(hreq.release(), path, i->handler));
289         assert(workQueue);
290         if (workQueue->Enqueue(item.get()))
291             item.release(); /* if true, queue took ownership */
292         else
293             item->req->WriteReply(HTTP_INTERNAL, "Work queue depth exceeded");
294     } else {
295         hreq->WriteReply(HTTP_NOTFOUND);
296     }
297 }
298
299 /** Callback to reject HTTP requests after shutdown. */
300 static void http_reject_request_cb(struct evhttp_request* req, void*)
301 {
302     LogPrint("http", "Rejecting request while shutting down\n");
303     evhttp_send_error(req, HTTP_SERVUNAVAIL, NULL);
304 }
305
306 /** Event dispatcher thread */
307 static void ThreadHTTP(struct event_base* base, struct evhttp* http)
308 {
309     RenameThread("bitcoin-http");
310     LogPrint("http", "Entering http event loop\n");
311     event_base_dispatch(base);
312     // Event loop will be interrupted by InterruptHTTPServer()
313     LogPrint("http", "Exited http event loop\n");
314 }
315
316 /** Bind HTTP server to specified addresses */
317 static bool HTTPBindAddresses(struct evhttp* http)
318 {
319     int defaultPort = GetArg("-rpcport", BaseParams().RPCPort());
320     std::vector<std::pair<std::string, uint16_t> > endpoints;
321
322     // Determine what addresses to bind to
323     if (!mapArgs.count("-rpcallowip")) { // Default to loopback if not allowing external IPs
324         endpoints.push_back(std::make_pair("::1", defaultPort));
325         endpoints.push_back(std::make_pair("127.0.0.1", defaultPort));
326         if (mapArgs.count("-rpcbind")) {
327             LogPrintf("WARNING: option -rpcbind was ignored because -rpcallowip was not specified, refusing to allow everyone to connect\n");
328         }
329     } else if (mapArgs.count("-rpcbind")) { // Specific bind address
330         const std::vector<std::string>& vbind = mapMultiArgs["-rpcbind"];
331         for (std::vector<std::string>::const_iterator i = vbind.begin(); i != vbind.end(); ++i) {
332             int port = defaultPort;
333             std::string host;
334             SplitHostPort(*i, port, host);
335             endpoints.push_back(std::make_pair(host, port));
336         }
337     } else { // No specific bind address specified, bind to any
338         endpoints.push_back(std::make_pair("::", defaultPort));
339         endpoints.push_back(std::make_pair("0.0.0.0", defaultPort));
340     }
341
342     // Bind addresses
343     for (std::vector<std::pair<std::string, uint16_t> >::iterator i = endpoints.begin(); i != endpoints.end(); ++i) {
344         LogPrint("http", "Binding RPC on address %s port %i\n", i->first, i->second);
345         evhttp_bound_socket *bind_handle = evhttp_bind_socket_with_handle(http, i->first.empty() ? NULL : i->first.c_str(), i->second);
346         if (bind_handle) {
347             boundSockets.push_back(bind_handle);
348         } else {
349             LogPrintf("Binding RPC on address %s port %i failed.\n", i->first, i->second);
350         }
351     }
352     return !boundSockets.empty();
353 }
354
355 /** Simple wrapper to set thread name and run work queue */
356 static void HTTPWorkQueueRun(WorkQueue<HTTPClosure>* queue)
357 {
358     RenameThread("bitcoin-httpworker");
359     queue->Run();
360 }
361
362 /** libevent event log callback */
363 static void libevent_log_cb(int severity, const char *msg)
364 {
365 #ifndef EVENT_LOG_WARN
366 // EVENT_LOG_WARN was added in 2.0.19; but before then _EVENT_LOG_WARN existed.
367 # define EVENT_LOG_WARN _EVENT_LOG_WARN
368 #endif
369     if (severity >= EVENT_LOG_WARN) // Log warn messages and higher without debug category
370         LogPrintf("libevent: %s\n", msg);
371     else
372         LogPrint("libevent", "libevent: %s\n", msg);
373 }
374
375 bool InitHTTPServer()
376 {
377     struct evhttp* http = 0;
378     struct event_base* base = 0;
379
380     if (!InitHTTPAllowList())
381         return false;
382
383     if (GetBoolArg("-rpcssl", false)) {
384         uiInterface.ThreadSafeMessageBox(
385             "SSL mode for RPC (-rpcssl) is no longer supported.",
386             "", CClientUIInterface::MSG_ERROR);
387         return false;
388     }
389
390     // Redirect libevent's logging to our own log
391     event_set_log_callback(&libevent_log_cb);
392 #if LIBEVENT_VERSION_NUMBER >= 0x02010100
393     // If -debug=libevent, set full libevent debugging.
394     // Otherwise, disable all libevent debugging.
395     if (LogAcceptCategory("libevent"))
396         event_enable_debug_logging(EVENT_DBG_ALL);
397     else
398         event_enable_debug_logging(EVENT_DBG_NONE);
399 #endif
400 #ifdef WIN32
401     evthread_use_windows_threads();
402 #else
403     evthread_use_pthreads();
404 #endif
405
406     base = event_base_new(); // XXX RAII
407     if (!base) {
408         LogPrintf("Couldn't create an event_base: exiting\n");
409         return false;
410     }
411
412     /* Create a new evhttp object to handle requests. */
413     http = evhttp_new(base); // XXX RAII
414     if (!http) {
415         LogPrintf("couldn't create evhttp. Exiting.\n");
416         event_base_free(base);
417         return false;
418     }
419
420     evhttp_set_timeout(http, GetArg("-rpcservertimeout", DEFAULT_HTTP_SERVER_TIMEOUT));
421     evhttp_set_max_body_size(http, MAX_SIZE);
422     evhttp_set_gencb(http, http_request_cb, NULL);
423
424     if (!HTTPBindAddresses(http)) {
425         LogPrintf("Unable to bind any endpoint for RPC server\n");
426         evhttp_free(http);
427         event_base_free(base);
428         return false;
429     }
430
431     LogPrint("http", "Initialized HTTP server\n");
432     int workQueueDepth = std::max((long)GetArg("-rpcworkqueue", DEFAULT_HTTP_WORKQUEUE), 1L);
433     LogPrintf("HTTP: creating work queue of depth %d\n", workQueueDepth);
434
435     workQueue = new WorkQueue<HTTPClosure>(workQueueDepth);
436     eventBase = base;
437     eventHTTP = http;
438     return true;
439 }
440
441 bool StartHTTPServer(boost::thread_group& threadGroup)
442 {
443     LogPrint("http", "Starting HTTP server\n");
444     int rpcThreads = std::max((long)GetArg("-rpcthreads", DEFAULT_HTTP_THREADS), 1L);
445     LogPrintf("HTTP: starting %d worker threads\n", rpcThreads);
446     threadGroup.create_thread(boost::bind(&ThreadHTTP, eventBase, eventHTTP));
447
448     for (int i = 0; i < rpcThreads; i++)
449         threadGroup.create_thread(boost::bind(&HTTPWorkQueueRun, workQueue));
450     return true;
451 }
452
453 void InterruptHTTPServer()
454 {
455     LogPrint("http", "Interrupting HTTP server\n");
456     if (eventHTTP) {
457         // Unlisten sockets
458         BOOST_FOREACH (evhttp_bound_socket *socket, boundSockets) {
459             evhttp_del_accept_socket(eventHTTP, socket);
460         }
461         // Reject requests on current connections
462         evhttp_set_gencb(eventHTTP, http_reject_request_cb, NULL);
463     }
464     if (workQueue)
465         workQueue->Interrupt();
466 }
467
468 void StopHTTPServer()
469 {
470     LogPrint("http", "Stopping HTTP server\n");
471     if (workQueue) {
472         LogPrint("http", "Waiting for HTTP worker threads to exit\n");
473         workQueue->WaitExit();
474         delete workQueue;
475     }
476     if (eventHTTP) {
477         evhttp_free(eventHTTP);
478         eventHTTP = 0;
479     }
480     if (eventBase) {
481         event_base_free(eventBase);
482         eventBase = 0;
483     }
484 }
485
486 struct event_base* EventBase()
487 {
488     return eventBase;
489 }
490
491 static void httpevent_callback_fn(evutil_socket_t, short, void* data)
492 {
493     // Static handler: simply call inner handler
494     HTTPEvent *self = ((HTTPEvent*)data);
495     self->handler();
496     if (self->deleteWhenTriggered)
497         delete self;
498 }
499
500 HTTPEvent::HTTPEvent(struct event_base* base, bool deleteWhenTriggered, const boost::function<void(void)>& handler):
501     deleteWhenTriggered(deleteWhenTriggered), handler(handler)
502 {
503     ev = event_new(base, -1, 0, httpevent_callback_fn, this);
504     assert(ev);
505 }
506 HTTPEvent::~HTTPEvent()
507 {
508     event_free(ev);
509 }
510 void HTTPEvent::trigger(struct timeval* tv)
511 {
512     if (tv == NULL)
513         event_active(ev, 0, 0); // immediately trigger event in main thread
514     else
515         evtimer_add(ev, tv); // trigger after timeval passed
516 }
517 HTTPRequest::HTTPRequest(struct evhttp_request* req) : req(req),
518                                                        replySent(false)
519 {
520 }
521 HTTPRequest::~HTTPRequest()
522 {
523     if (!replySent) {
524         // Keep track of whether reply was sent to avoid request leaks
525         LogPrintf("%s: Unhandled request\n", __func__);
526         WriteReply(HTTP_INTERNAL, "Unhandled request");
527     }
528     // evhttpd cleans up the request, as long as a reply was sent.
529 }
530
531 std::pair<bool, std::string> HTTPRequest::GetHeader(const std::string& hdr)
532 {
533     const struct evkeyvalq* headers = evhttp_request_get_input_headers(req);
534     assert(headers);
535     const char* val = evhttp_find_header(headers, hdr.c_str());
536     if (val)
537         return std::make_pair(true, val);
538     else
539         return std::make_pair(false, "");
540 }
541
542 std::string HTTPRequest::ReadBody()
543 {
544     struct evbuffer* buf = evhttp_request_get_input_buffer(req);
545     if (!buf)
546         return "";
547     size_t size = evbuffer_get_length(buf);
548     /** Trivial implementation: if this is ever a performance bottleneck,
549      * internal copying can be avoided in multi-segment buffers by using
550      * evbuffer_peek and an awkward loop. Though in that case, it'd be even
551      * better to not copy into an intermediate string but use a stream
552      * abstraction to consume the evbuffer on the fly in the parsing algorithm.
553      */
554     const char* data = (const char*)evbuffer_pullup(buf, size);
555     if (!data) // returns NULL in case of empty buffer
556         return "";
557     std::string rv(data, size);
558     evbuffer_drain(buf, size);
559     return rv;
560 }
561
562 void HTTPRequest::WriteHeader(const std::string& hdr, const std::string& value)
563 {
564     struct evkeyvalq* headers = evhttp_request_get_output_headers(req);
565     assert(headers);
566     evhttp_add_header(headers, hdr.c_str(), value.c_str());
567 }
568
569 /** Closure sent to main thread to request a reply to be sent to
570  * a HTTP request.
571  * Replies must be sent in the main loop in the main http thread,
572  * this cannot be done from worker threads.
573  */
574 void HTTPRequest::WriteReply(int nStatus, const std::string& strReply)
575 {
576     assert(!replySent && req);
577     // Send event to main http thread to send reply message
578     struct evbuffer* evb = evhttp_request_get_output_buffer(req);
579     assert(evb);
580     evbuffer_add(evb, strReply.data(), strReply.size());
581     HTTPEvent* ev = new HTTPEvent(eventBase, true,
582         boost::bind(evhttp_send_reply, req, nStatus, (const char*)NULL, (struct evbuffer *)NULL));
583     ev->trigger(0);
584     replySent = true;
585     req = 0; // transferred back to main thread
586 }
587
588 CService HTTPRequest::GetPeer()
589 {
590     evhttp_connection* con = evhttp_request_get_connection(req);
591     CService peer;
592     if (con) {
593         // evhttp retains ownership over returned address string
594         const char* address = "";
595         uint16_t port = 0;
596         evhttp_connection_get_peer(con, (char**)&address, &port);
597         peer = CService(address, port);
598     }
599     return peer;
600 }
601
602 std::string HTTPRequest::GetURI()
603 {
604     return evhttp_request_get_uri(req);
605 }
606
607 HTTPRequest::RequestMethod HTTPRequest::GetRequestMethod()
608 {
609     switch (evhttp_request_get_command(req)) {
610     case EVHTTP_REQ_GET:
611         return GET;
612         break;
613     case EVHTTP_REQ_POST:
614         return POST;
615         break;
616     case EVHTTP_REQ_HEAD:
617         return HEAD;
618         break;
619     case EVHTTP_REQ_PUT:
620         return PUT;
621         break;
622     default:
623         return UNKNOWN;
624         break;
625     }
626 }
627
628 void RegisterHTTPHandler(const std::string &prefix, bool exactMatch, const HTTPRequestHandler &handler)
629 {
630     LogPrint("http", "Registering HTTP handler for %s (exactmatch %d)\n", prefix, exactMatch);
631     pathHandlers.push_back(HTTPPathHandler(prefix, exactMatch, handler));
632 }
633
634 void UnregisterHTTPHandler(const std::string &prefix, bool exactMatch)
635 {
636     std::vector<HTTPPathHandler>::iterator i = pathHandlers.begin();
637     std::vector<HTTPPathHandler>::iterator iend = pathHandlers.end();
638     for (; i != iend; ++i)
639         if (i->prefix == prefix && i->exactMatch == exactMatch)
640             break;
641     if (i != iend)
642     {
643         LogPrint("http", "Unregistering HTTP handler for %s (exactmatch %d)\n", prefix, exactMatch);
644         pathHandlers.erase(i);
645     }
646 }
647
This page took 0.059511 seconds and 4 git commands to generate.