]> Git Repo - VerusCoin.git/commitdiff
Add ZeroMQ support. Notify blocks and transactions via ZeroMQ
authorJeff Garzik <[email protected]>
Tue, 18 Nov 2014 17:06:32 +0000 (12:06 -0500)
committerJack Grigg <[email protected]>
Wed, 8 Feb 2017 22:10:42 +0000 (22:10 +0000)
Continues Johnathan Corgan's work.
Publishing multipart messages

Bugfix: Add missing zmq header includes

Bugfix: Adjust build system to link ZeroMQ code for Qt binaries

16 files changed:
configure.ac
contrib/zmq/zmq_sub.py [new file with mode: 0755]
doc/zmq.md [new file with mode: 0644]
src/Makefile.am
src/Makefile.qt.include
src/Makefile.qttest.include
src/Makefile.test.include
src/init.cpp
src/validationinterface.cpp
src/zmq/zmqabstractnotifier.cpp [new file with mode: 0644]
src/zmq/zmqabstractnotifier.h [new file with mode: 0644]
src/zmq/zmqconfig.h [new file with mode: 0644]
src/zmq/zmqnotificationinterface.cpp [new file with mode: 0644]
src/zmq/zmqnotificationinterface.h [new file with mode: 0644]
src/zmq/zmqpublishnotifier.cpp [new file with mode: 0644]
src/zmq/zmqpublishnotifier.h [new file with mode: 0644]

index 8a666ec6bca6bc85bf61c6f26f79a60cf2d85227..3d312ca809c1b4ff7d6fc6b7fb22d7a13488bb07 100644 (file)
@@ -150,6 +150,12 @@ AC_ARG_ENABLE([glibc-back-compat],
   [use_glibc_compat=$enableval],
   [use_glibc_compat=no])
 
+AC_ARG_ENABLE([zmq],
+  [AC_HELP_STRING([--disable-zmq],
+  [Disable ZMQ notifications])],
+  [use_zmq=$enableval],
+  [use_zmq=yes])
+
 AC_ARG_WITH([protoc-bindir],[AS_HELP_STRING([--with-protoc-bindir=BIN_DIR],[specify protoc bin path])], [protoc_bin_path=$withval], [])
 
 # Enable debug 
@@ -857,6 +863,22 @@ if test x$bitcoin_enable_qt != xno; then
   fi
 fi
 
+# conditional search for and use libzmq
+AC_MSG_CHECKING([whether to build ZMQ support])
+if test "x$use_zmq" = "xyes"; then
+  AC_MSG_RESULT([yes])
+  PKG_CHECK_MODULES([ZMQ],[libzmq],
+    [AC_DEFINE([ENABLE_ZMQ],[1],[Define to 1 to enable ZMQ functions])],
+    [AC_DEFINE([ENABLE_ZMQ],[0],[Define to 1 to enable ZMQ functions])
+     AC_MSG_WARN([libzmq not found, disabling])
+     use_zmq=no])
+else
+  AC_MSG_RESULT([no, --disable-zmq used])
+  AC_DEFINE_UNQUOTED([ENABLE_ZMQ],[0],[Define to 1 to enable ZMQ functions])
+fi
+
+AM_CONDITIONAL([ENABLE_ZMQ], [test "x$use_zmq" = "xyes"])
+
 AC_MSG_CHECKING([whether to build test_bitcoin])
 if test x$use_tests = xyes; then
   AC_MSG_RESULT([yes])
diff --git a/contrib/zmq/zmq_sub.py b/contrib/zmq/zmq_sub.py
new file mode 100755 (executable)
index 0000000..decf29d
--- /dev/null
@@ -0,0 +1,37 @@
+#!/usr/bin/env python2
+
+import array
+import binascii
+import zmq
+
+port = 28332
+
+zmqContext = zmq.Context()
+zmqSubSocket = zmqContext.socket(zmq.SUB)
+zmqSubSocket.setsockopt(zmq.SUBSCRIBE, "hashblock")
+zmqSubSocket.setsockopt(zmq.SUBSCRIBE, "hashtx")
+zmqSubSocket.setsockopt(zmq.SUBSCRIBE, "rawblock")
+zmqSubSocket.setsockopt(zmq.SUBSCRIBE, "rawtx")
+zmqSubSocket.connect("tcp://127.0.0.1:%i" % port)
+
+try:
+    while True:
+        msg = zmqSubSocket.recv_multipart()
+        topic = str(msg[0])
+        body = msg[1]
+
+        if topic == "hashblock":
+            print "- HASH BLOCK -"
+            print binascii.hexlify(body)
+        elif topic == "hashtx":
+            print '- HASH TX -'
+            print binascii.hexlify(body)
+        elif topic == "rawblock":
+            print "- RAW BLOCK HEADER -"
+            print binascii.hexlify(body[:80])
+        elif topic == "rawtx":
+            print '- RAW TX -'
+            print binascii.hexlify(body)
+
+except KeyboardInterrupt:
+    zmqContext.destroy()
diff --git a/doc/zmq.md b/doc/zmq.md
new file mode 100644 (file)
index 0000000..fd04f6d
--- /dev/null
@@ -0,0 +1,98 @@
+# Block and Transaction Broadcasting With ZeroMQ
+
+[ZeroMQ](http://zeromq.org/) is a lightweight wrapper around TCP
+connections, inter-process communications, and shared-memory,
+providing various message-oriented semantics such as publish/subcribe,
+request/reply, and push/pull.
+
+The Bitcoin Core daemon can be configured to act as a trusted "border
+router", implementing the bitcoin wire protocol and relay, making
+consensus decisions, maintaining the local blockchain database,
+broadcasting locally generated transactions into the network, and
+providing a queryable RPC interface to interact on a polled basis for
+requesting blockchain related data. However, there exists only a
+limited service to notify external software of events like the arrival
+of new blocks or transactions.
+
+The ZeroMQ facility implements a notification interface through a
+set of specific notifiers. Currently there are notifiers that publish
+blocks and transactions. This read-only facility requires only the
+connection of a corresponding ZeroMQ subscriber port in receiving 
+software; it is not authenticated nor is there any two-way protocol
+involvement. Therefore, subscribers should validate the received data
+since it may be out of date, incomplete or even invalid.
+
+ZeroMQ sockets are self-connecting and self-healing; that is, connects
+made between two endpoints will be automatically restored after an
+outage, and either end may be freely started or stopped in any order.
+
+Because ZeroMQ is message oriented, subscribers receive transactions
+and blocks all-at-once and do not need to implement any sort of
+buffering or reassembly.
+
+## Prerequisites
+
+The ZeroMQ feature in Bitcoin Core uses only a very small part of the
+ZeroMQ C API, and is thus compatible with any version of ZeroMQ
+from 2.1 onward, including all versions in the 3.x and 4.x release
+series. Typically, it is packaged by distributions as something like
+*libzmq-dev*.
+
+The C++ wrapper for ZeroMQ is *not* needed.
+
+## Enabling
+
+By default, the ZeroMQ port functionality is enabled. Two steps are
+required to enable--compiling in the ZeroMQ code, and configuring
+runtime operation on the command-line or configuration file.
+
+    $ ./configure --enable-zmq (other options)
+
+This will produce a binary that is capable of providing the ZeroMQ
+facility, but will not do so until also configured properly.
+
+## Usage
+
+Currently, the following notifications are supported:
+
+    -zmqpubhashtx=address
+    -zmqpubhashblock=address
+    -zmqpubrawblock=address
+    -zmqpubrawtx=address
+
+The socket type is PUB and the address must be a valid ZeroMQ
+socket address. The same address can be used in more than one notification.
+
+For instance:
+
+    $ bitcoind -zmqpubhashtx=tcp://127.0.0.1:28332 -zmqpubrawtx=ipc:///tmp/bitcoind.tx.raw
+
+Each PUB notification has a topic and body, where the header
+corresponds to the notification type. For instance, for the notification
+`-zmqpubhashtx` the topic is `hashtx` (no null terminator) and the body is the
+hexadecimal transaction hash (32 bytes).
+
+These options can also be provided in bitcoin.conf.
+
+ZeroMQ endpoint specifiers for TCP (and others) are documented in the
+[ZeroMQ API](http://api.zeromq.org).
+
+Client side, then, the ZeroMQ subscriber socket must have the
+ZMQ_SUBSCRIBE option set to one or either of these prefixes (for instance, just `hash`); without
+doing so will result in no messages arriving. Please see `contrib/zmq/zmq_sub.py`
+for a working example.
+
+## Remarks
+
+From the perspective of bitcoind, the ZeroMQ socket is write-only; PUB
+sockets don't even have a read function. Thus, there is no state
+introduced into bitcoind directly. Furthermore, no information is
+broadcast that wasn't already received from the public P2P network.
+
+No authentication or authorization is done on connecting clients; it
+is assumed that the ZeroMQ port is exposed only to trusted entities,
+using other means such as firewalling.
+
+Note that when the block chain tip changes, a reorganisation may occur and just
+the tip will be notified. It is up to the subscriber to retrieve the chain
+from the last known block to the new tip.
index f18071678234c913c97a5827c510133218ad878f..54743878b0c016fc3201f8f01bf8dc51c9dce7dd 100644 (file)
@@ -50,6 +50,9 @@ if ENABLE_WALLET
 BITCOIN_INCLUDES += $(BDB_CPPFLAGS)
 EXTRA_LIBRARIES += libbitcoin_wallet.a
 endif
+if ENABLE_ZMQ
+EXTRA_LIBRARIES += libbitcoin_zmq.a
+endif
 
 if BUILD_BITCOIN_LIBS
 lib_LTLIBRARIES = libzcashconsensus.la
@@ -172,7 +175,12 @@ BITCOIN_CORE_H = \
   wallet/db.h \
   wallet/wallet.h \
   wallet/wallet_ismine.h \
-  wallet/walletdb.h
+  wallet/walletdb.h \
+  zmq/zmqabstractnotifier.h \
+  zmq/zmqconfig.h\
+  zmq/zmqnotificationinterface.h \
+  zmq/zmqpublishnotifier.h
+
 
 JSON_H = \
   json/json_spirit.h \
@@ -229,6 +237,17 @@ libbitcoin_server_a_SOURCES = \
   $(BITCOIN_CORE_H) \
   $(LIBZCASH_H)
 
+if ENABLE_ZMQ
+LIBBITCOIN_ZMQ=libbitcoin_zmq.a
+
+libbitcoin_zmq_a_CPPFLAGS = $(BITCOIN_INCLUDES)
+libbitcoin_zmq_a_SOURCES = \
+  zmq/zmqabstractnotifier.cpp \
+  zmq/zmqnotificationinterface.cpp \
+  zmq/zmqpublishnotifier.cpp
+endif
+
+
 # wallet: shared between bitcoind and bitcoin-qt, but only linked
 # when wallet enabled
 libbitcoin_wallet_a_CPPFLAGS = $(BITCOIN_INCLUDES)
@@ -369,6 +388,10 @@ zcashd_LDADD = \
   $(LIBMEMENV) \
   $(LIBSECP256K1)
 
+if ENABLE_ZMQ
+zcashd_LDADD += $(LIBBITCOIN_ZMQ) $(ZMQ_LIBS)
+endif
+
 if ENABLE_WALLET
 zcashd_LDADD += libbitcoin_wallet.a
 endif
@@ -382,7 +405,6 @@ zcashd_LDADD += \
   $(LIBZCASH) \
   $(LIBBITCOIN_CRYPTO) \
   $(LIBZCASH_LIBS)
-#
 
 # bitcoin-cli binary #
 zcash_cli_SOURCES = bitcoin-cli.cpp
index d45de8d20c76e83147ebf0be0fd01c085bd5c929..9513da0805409257c66bfbee5d623190171907dc 100644 (file)
@@ -361,6 +361,9 @@ qt_bitcoin_qt_LDADD = qt/libbitcoinqt.a $(LIBBITCOIN_SERVER)
 if ENABLE_WALLET
 qt_bitcoin_qt_LDADD += $(LIBBITCOIN_WALLET)
 endif
+if ENABLE_ZMQ
+qt_bitcoin_qt_LDADD += $(LIBBITCOIN_ZMQ) $(ZMQ_LIBS)
+endif
 qt_bitcoin_qt_LDADD += $(LIBBITCOIN_CLI) $(LIBBITCOIN_COMMON) $(LIBBITCOIN_UTIL) $(LIBBITCOIN_CRYPTO) $(LIBBITCOIN_UNIVALUE) $(LIBLEVELDB) $(LIBMEMENV) \
   $(BOOST_LIBS) $(QT_LIBS) $(QT_DBUS_LIBS) $(QR_LIBS) $(PROTOBUF_LIBS) $(BDB_LIBS) $(SSL_LIBS) $(CRYPTO_LIBS) $(MINIUPNPC_LIBS) $(LIBSECP256K1) $(LIBZCASH_LIBS)
 qt_bitcoin_qt_LDFLAGS = $(RELDFLAGS) $(AM_LDFLAGS) $(QT_LDFLAGS) $(LIBTOOL_APP_LDFLAGS)
index 4963a396b2bfcffc33aabeabc908a7f35c9e2555..128c3f6d693bc812bd3bd2f9a97c1e71f0a2877d 100644 (file)
@@ -30,6 +30,9 @@ qt_test_test_bitcoin_qt_LDADD = $(LIBBITCOINQT) $(LIBBITCOIN_SERVER)
 if ENABLE_WALLET
 qt_test_test_bitcoin_qt_LDADD += $(LIBBITCOIN_WALLET)
 endif
+if ENABLE_ZMQ
+qt_test_test_bitcoin_qt_LDADD += $(LIBBITCOIN_ZMQ) $(ZMQ_LIBS)
+endif
 qt_test_test_bitcoin_qt_LDADD += $(LIBBITCOIN_CLI) $(LIBBITCOIN_COMMON) $(LIBBITCOIN_UTIL) $(LIBBITCOIN_CRYPTO) $(LIBBITCOIN_UNIVALUE) $(LIBLEVELDB) \
   $(LIBMEMENV) $(BOOST_LIBS) $(QT_DBUS_LIBS) $(QT_TEST_LIBS) $(QT_LIBS) \
   $(QR_LIBS) $(PROTOBUF_LIBS) $(BDB_LIBS) $(SSL_LIBS) $(CRYPTO_LIBS) $(MINIUPNPC_LIBS) $(LIBSECP256K1) $(LIBZCASH_LIBS)
index 09d34043ae26c056fedfb10b60c7e8ff482f28ed..56be626d63063a55ab7f0decbdb138b6a2c85e99 100644 (file)
@@ -107,6 +107,10 @@ endif
 test_test_bitcoin_LDADD += $(LIBZCASH_CONSENSUS) $(BDB_LIBS) $(SSL_LIBS) $(CRYPTO_LIBS) $(MINIUPNPC_LIBS) $(LIBZCASH) $(LIBZCASH_LIBS)
 test_test_bitcoin_LDFLAGS = $(RELDFLAGS) $(AM_LDFLAGS) $(LIBTOOL_APP_LDFLAGS) -static
 
+if ENABLE_ZMQ
+test_test_bitcoin_LDADD += $(ZMQ_LIBS)
+endif
+
 nodist_test_test_bitcoin_SOURCES = $(GENERATED_TEST_FILES)
 
 $(BITCOIN_TESTS): $(GENERATED_TEST_FILES)
index cf09fe06f1d8938e9f28a3aa4b5b79ff7c4284e4..3fc30a2858328adc7312e2ab2226dfa44a153fa5 100644 (file)
@@ -31,7 +31,6 @@
 #include "wallet/wallet.h"
 #include "wallet/walletdb.h"
 #endif
-
 #include <stdint.h>
 #include <stdio.h>
 
 
 #include "libsnark/common/profiling.hpp"
 
+#if ENABLE_ZMQ
+#include "zmq/zmqnotificationinterface.h"
+#endif
+
 using namespace std;
 
 extern void ThreadSendAlert();
@@ -61,6 +64,10 @@ CWallet* pwalletMain = NULL;
 #endif
 bool fFeeEstimatesInitialized = false;
 
+#if ENABLE_ZMQ
+static CZMQNotificationInterface* pzmqNotificationInterface = NULL;
+#endif
+
 #ifdef WIN32
 // Win32 LevelDB doesn't use file descriptors, and the ones used for
 // accessing block files don't count towards the fd_set size limit
@@ -197,6 +204,16 @@ void Shutdown()
     if (pwalletMain)
         pwalletMain->Flush(true);
 #endif
+
+#if ENABLE_ZMQ
+    if (pzmqNotificationInterface) {
+        UnregisterValidationInterface(pzmqNotificationInterface);
+        pzmqNotificationInterface->Shutdown();
+        delete pzmqNotificationInterface;
+        pzmqNotificationInterface = NULL;
+    }
+#endif
+
 #ifndef WIN32
     try {
         boost::filesystem::remove(GetPidFile());
@@ -365,6 +382,14 @@ std::string HelpMessage(HelpMessageMode mode)
                     
 #endif
 
+#if ENABLE_ZMQ
+    strUsage += HelpMessageGroup(_("ZeroMQ notification options:"));
+    strUsage += HelpMessageOpt("-zmqpubhashblock=<address>", _("Enable publish hash block in <address>"));
+    strUsage += HelpMessageOpt("-zmqpubhashtransaction=<address>", _("Enable publish hash transaction in <address>"));
+    strUsage += HelpMessageOpt("-zmqpubrawblock=<address>", _("Enable publish raw block in <address>"));
+    strUsage += HelpMessageOpt("-zmqpubrawtransaction=<address>", _("Enable publish raw transaction in <address>"));
+#endif
+
     strUsage += HelpMessageGroup(_("Debugging/Testing options:"));
     if (showDebug)
     {
@@ -1141,6 +1166,15 @@ bool AppInit2(boost::thread_group& threadGroup, CScheduler& scheduler)
     BOOST_FOREACH(string strDest, mapMultiArgs["-seednode"])
         AddOneShot(strDest);
 
+#if ENABLE_ZMQ
+    pzmqNotificationInterface = CZMQNotificationInterface::CreateWithArguments(mapArgs);
+
+    if (pzmqNotificationInterface) {
+        pzmqNotificationInterface->Initialize();
+        RegisterValidationInterface(pzmqNotificationInterface);
+    }
+#endif
+
     // ********************************************************* Step 7: load block chain
 
     fReindex = GetBoolArg("-reindex", false);
index d06fd9a85d8fb41cee286ca6a9ca170e7ec4af59..cd3e30f3d470830a486e9753bba67cb9d9a5c667 100644 (file)
@@ -45,7 +45,6 @@ void UnregisterAllValidationInterfaces() {
     g_signals.UpdatedTransaction.disconnect_all_slots();
     g_signals.EraseTransaction.disconnect_all_slots();
     g_signals.SyncTransaction.disconnect_all_slots();
-    g_signals.UpdatedTransaction.disconnect_all_slots();
     g_signals.UpdatedBlockTip.disconnect_all_slots();
 }
 
diff --git a/src/zmq/zmqabstractnotifier.cpp b/src/zmq/zmqabstractnotifier.cpp
new file mode 100644 (file)
index 0000000..744ec59
--- /dev/null
@@ -0,0 +1,22 @@
+// Copyright (c) 2015 The Bitcoin Core developers
+// Distributed under the MIT software license, see the accompanying
+// file COPYING or http://www.opensource.org/licenses/mit-license.php.
+
+#include "zmqabstractnotifier.h"
+#include "util.h"
+
+
+CZMQAbstractNotifier::~CZMQAbstractNotifier()
+{
+    assert(!psocket);
+}
+
+bool CZMQAbstractNotifier::NotifyBlock(const uint256 &/*hash*/)
+{
+    return true;
+}
+
+bool CZMQAbstractNotifier::NotifyTransaction(const CTransaction &/*transaction*/)
+{
+    return true;
+}
diff --git a/src/zmq/zmqabstractnotifier.h b/src/zmq/zmqabstractnotifier.h
new file mode 100644 (file)
index 0000000..626d1dd
--- /dev/null
@@ -0,0 +1,42 @@
+// Copyright (c) 2015 The Bitcoin Core developers
+// Distributed under the MIT software license, see the accompanying
+// file COPYING or http://www.opensource.org/licenses/mit-license.php.
+
+#ifndef BITCOIN_ZMQ_ZMQABSTRACTNOTIFIER_H
+#define BITCOIN_ZMQ_ZMQABSTRACTNOTIFIER_H
+
+#include "zmqconfig.h"
+
+class CZMQAbstractNotifier;
+typedef CZMQAbstractNotifier* (*CZMQNotifierFactory)();
+
+class CZMQAbstractNotifier
+{
+public:
+    CZMQAbstractNotifier() : psocket(0) { }
+    virtual ~CZMQAbstractNotifier();
+
+    template <typename T>
+    static CZMQAbstractNotifier* Create()
+    {
+        return new T();
+    }
+
+    std::string GetType() const { return type; }
+    void SetType(const std::string &t) { type = t; }
+    std::string GetAddress() const { return address; }
+    void SetAddress(const std::string &a) { address = a; }
+
+    virtual bool Initialize(void *pcontext) = 0;
+    virtual void Shutdown() = 0;
+
+    virtual bool NotifyBlock(const uint256 &hash);
+    virtual bool NotifyTransaction(const CTransaction &transaction);
+
+protected:
+    void *psocket;
+    std::string type;
+    std::string address;
+};
+
+#endif // BITCOIN_ZMQ_ZMQABSTRACTNOTIFIER_H
diff --git a/src/zmq/zmqconfig.h b/src/zmq/zmqconfig.h
new file mode 100644 (file)
index 0000000..6057f5d
--- /dev/null
@@ -0,0 +1,24 @@
+// Copyright (c) 2015 The Bitcoin Core developers
+// Distributed under the MIT software license, see the accompanying
+// file COPYING or http://www.opensource.org/licenses/mit-license.php.
+
+#ifndef BITCOIN_ZMQ_ZMQCONFIG_H
+#define BITCOIN_ZMQ_ZMQCONFIG_H
+
+#if defined(HAVE_CONFIG_H)
+#include "config/bitcoin-config.h"
+#endif
+
+#include <stdarg.h>
+#include <string>
+
+#if ENABLE_ZMQ
+#include <zmq.h>
+#endif
+
+#include "primitives/block.h"
+#include "primitives/transaction.h"
+
+void zmqError(const char *str);
+
+#endif // BITCOIN_ZMQ_ZMQCONFIG_H
diff --git a/src/zmq/zmqnotificationinterface.cpp b/src/zmq/zmqnotificationinterface.cpp
new file mode 100644 (file)
index 0000000..71ccb59
--- /dev/null
@@ -0,0 +1,155 @@
+// Copyright (c) 2015 The Bitcoin Core developers
+// Distributed under the MIT software license, see the accompanying
+// file COPYING or http://www.opensource.org/licenses/mit-license.php.
+
+#include "zmqnotificationinterface.h"
+#include "zmqpublishnotifier.h"
+
+#include "version.h"
+#include "main.h"
+#include "streams.h"
+#include "util.h"
+
+void zmqError(const char *str)
+{
+    LogPrint("zmq", "Error: %s, errno=%s\n", str, zmq_strerror(errno));
+}
+
+CZMQNotificationInterface::CZMQNotificationInterface() : pcontext(NULL)
+{
+}
+
+CZMQNotificationInterface::~CZMQNotificationInterface()
+{
+    // ensure Shutdown if Initialize is called
+    assert(!pcontext);
+
+    for (std::list<CZMQAbstractNotifier*>::iterator i=notifiers.begin(); i!=notifiers.end(); ++i)
+    {
+        delete *i;
+    }
+}
+
+CZMQNotificationInterface* CZMQNotificationInterface::CreateWithArguments(const std::map<std::string, std::string> &args)
+{
+    CZMQNotificationInterface* notificationInterface = NULL;
+    std::map<std::string, CZMQNotifierFactory> factories;
+    std::list<CZMQAbstractNotifier*> notifiers;
+
+    factories["pubhashblock"] = CZMQAbstractNotifier::Create<CZMQPublishHashBlockNotifier>;
+    factories["pubhashtx"] = CZMQAbstractNotifier::Create<CZMQPublishHashTransactionNotifier>;
+    factories["pubrawblock"] = CZMQAbstractNotifier::Create<CZMQPublishRawBlockNotifier>;
+    factories["pubrawtx"] = CZMQAbstractNotifier::Create<CZMQPublishRawTransactionNotifier>;
+
+    for (std::map<std::string, CZMQNotifierFactory>::const_iterator i=factories.begin(); i!=factories.end(); ++i)
+    {
+        std::map<std::string, std::string>::const_iterator j = args.find("-zmq" + i->first);
+        if (j!=args.end())
+        {
+            CZMQNotifierFactory factory = i->second;
+            std::string address = j->second;
+            CZMQAbstractNotifier *notifier = factory();
+            notifier->SetType(i->first);
+            notifier->SetAddress(address);
+            notifiers.push_back(notifier);
+        }
+    }
+
+    if (!notifiers.empty())
+    {
+        notificationInterface = new CZMQNotificationInterface();
+        notificationInterface->notifiers = notifiers;
+    }
+
+    return notificationInterface;
+}
+
+// Called at startup to conditionally set up ZMQ socket(s)
+bool CZMQNotificationInterface::Initialize()
+{
+    LogPrint("zmq", "Initialize notification interface\n");
+    assert(!pcontext);
+
+    pcontext = zmq_init(1);
+
+    if (!pcontext)
+    {
+        zmqError("Unable to initialize context");
+        return false;
+    }
+
+    std::list<CZMQAbstractNotifier*>::iterator i=notifiers.begin();
+    for (; i!=notifiers.end(); ++i)
+    {
+        CZMQAbstractNotifier *notifier = *i;
+        if (notifier->Initialize(pcontext))
+        {
+            LogPrint("zmq", "  Notifier %s ready (address = %s)\n", notifier->GetType(), notifier->GetAddress());
+        }
+        else
+        {
+            LogPrint("zmq", "  Notifier %s failed (address = %s)\n", notifier->GetType(), notifier->GetAddress());
+            break;
+        }
+    }
+
+    if (i!=notifiers.end())
+    {
+        Shutdown();
+        return false;
+    }
+
+    return false;
+}
+
+// Called during shutdown sequence
+void CZMQNotificationInterface::Shutdown()
+{
+    LogPrint("zmq", "Shutdown notification interface\n");
+    if (pcontext)
+    {
+        for (std::list<CZMQAbstractNotifier*>::iterator i=notifiers.begin(); i!=notifiers.end(); ++i)
+        {
+            CZMQAbstractNotifier *notifier = *i;
+            LogPrint("zmq", "   Shutdown notifier %s at %s\n", notifier->GetType(), notifier->GetAddress());
+            notifier->Shutdown();
+        }
+        zmq_ctx_destroy(pcontext);
+
+        pcontext = 0;
+    }
+}
+
+void CZMQNotificationInterface::UpdatedBlockTip(const uint256 &hash)
+{
+    for (std::list<CZMQAbstractNotifier*>::iterator i = notifiers.begin(); i!=notifiers.end(); )
+    {
+        CZMQAbstractNotifier *notifier = *i;
+        if (notifier->NotifyBlock(hash))
+        {
+            i++;
+        }
+        else
+        {
+            notifier->Shutdown();
+            i = notifiers.erase(i);
+        }
+    }
+}
+
+void CZMQNotificationInterface::SyncTransaction(const CTransaction &tx, const CBlock *pblock)
+{
+    for (std::list<CZMQAbstractNotifier*>::iterator i = notifiers.begin(); i!=notifiers.end(); )
+    {
+        CZMQAbstractNotifier *notifier = *i;
+        if (notifier->NotifyTransaction(tx))
+        {
+            i++;
+        }
+        else
+        {
+            notifier->Shutdown();
+            i = notifiers.erase(i);
+        }
+    }
+}
diff --git a/src/zmq/zmqnotificationinterface.h b/src/zmq/zmqnotificationinterface.h
new file mode 100644 (file)
index 0000000..afc0b8d
--- /dev/null
@@ -0,0 +1,35 @@
+// Copyright (c) 2015 The Bitcoin Core developers
+// Distributed under the MIT software license, see the accompanying
+// file COPYING or http://www.opensource.org/licenses/mit-license.php.
+
+#ifndef BITCOIN_ZMQ_ZMQNOTIFICATIONINTERFACE_H
+#define BITCOIN_ZMQ_ZMQNOTIFICATIONINTERFACE_H
+
+#include "validationinterface.h"
+#include <string>
+#include <map>
+
+class CZMQAbstractNotifier;
+
+class CZMQNotificationInterface : public CValidationInterface
+{
+public:
+    virtual ~CZMQNotificationInterface();
+
+    static CZMQNotificationInterface* CreateWithArguments(const std::map<std::string, std::string> &args);
+
+    bool Initialize();
+    void Shutdown();
+
+protected: // CValidationInterface
+    void SyncTransaction(const CTransaction &tx, const CBlock *pblock);
+    void UpdatedBlockTip(const uint256 &newHashTip);
+
+private:
+    CZMQNotificationInterface();
+
+    void *pcontext;
+    std::list<CZMQAbstractNotifier*> notifiers;
+};
+
+#endif // BITCOIN_ZMQ_ZMQNOTIFICATIONINTERFACE_H
diff --git a/src/zmq/zmqpublishnotifier.cpp b/src/zmq/zmqpublishnotifier.cpp
new file mode 100644 (file)
index 0000000..0a6d7d0
--- /dev/null
@@ -0,0 +1,172 @@
+// Copyright (c) 2015 The Bitcoin Core developers
+// Distributed under the MIT software license, see the accompanying
+// file COPYING or http://www.opensource.org/licenses/mit-license.php.
+
+#include "zmqpublishnotifier.h"
+#include "main.h"
+#include "util.h"
+
+static std::multimap<std::string, CZMQAbstractPublishNotifier*> mapPublishNotifiers;
+
+// Internal function to send multipart message
+static int zmq_send_multipart(void *sock, const void* data, size_t size, ...)
+{
+    va_list args;
+    va_start(args, size);
+
+    while (1)
+    {
+        zmq_msg_t msg;
+
+        int rc = zmq_msg_init_size(&msg, size);
+        if (rc != 0)
+        {
+            zmqError("Unable to initialize ZMQ msg");
+            return -1;
+        }
+
+        void *buf = zmq_msg_data(&msg);
+        memcpy(buf, data, size);
+
+        data = va_arg(args, const void*);
+
+        rc = zmq_msg_send(&msg, sock, data ? ZMQ_SNDMORE : 0);
+        if (rc == -1)
+        {
+            zmqError("Unable to send ZMQ msg");
+            zmq_msg_close(&msg);
+            return -1;
+        }
+
+        zmq_msg_close(&msg);
+
+        if (!data)
+            break;
+
+        size = va_arg(args, size_t);
+    }
+    return 0;
+}
+
+bool CZMQAbstractPublishNotifier::Initialize(void *pcontext)
+{
+    assert(!psocket);
+
+    // check if address is being used by other publish notifier
+    std::multimap<std::string, CZMQAbstractPublishNotifier*>::iterator i = mapPublishNotifiers.find(address);
+
+    if (i==mapPublishNotifiers.end())
+    {
+        psocket = zmq_socket(pcontext, ZMQ_PUB);
+        if (!psocket)
+        {
+            zmqError("Failed to create socket");
+            return false;
+        }
+
+        int rc = zmq_bind(psocket, address.c_str());
+        if (rc!=0)
+        {
+            zmqError("Failed to bind address");
+            return false;
+        }
+
+        // register this notifier for the address, so it can be reused for other publish notifier
+        mapPublishNotifiers.insert(std::make_pair(address, this));
+        return true;
+    }
+    else
+    {
+        LogPrint("zmq", "  Reuse socket for address %s\n", address);
+
+        psocket = i->second->psocket;
+        mapPublishNotifiers.insert(std::make_pair(address, this));
+
+        return true;
+    }
+}
+
+void CZMQAbstractPublishNotifier::Shutdown()
+{
+    assert(psocket);
+
+    int count = mapPublishNotifiers.count(address);
+
+    // remove this notifier from the list of publishers using this address
+    typedef std::multimap<std::string, CZMQAbstractPublishNotifier*>::iterator iterator;
+    std::pair<iterator, iterator> iterpair = mapPublishNotifiers.equal_range(address);
+
+    for (iterator it = iterpair.first; it != iterpair.second; ++it)
+    {
+        if (it->second==this)
+        {
+            mapPublishNotifiers.erase(it);
+            break;
+        }
+    }
+
+    if (count == 1)
+    {
+        LogPrint("zmq", "Close socket at address %s\n", address);
+        int linger = 0;
+        zmq_setsockopt(psocket, ZMQ_LINGER, &linger, sizeof(linger));
+        zmq_close(psocket);
+    }
+
+    psocket = 0;
+}
+
+bool CZMQPublishHashBlockNotifier::NotifyBlock(const uint256 &hash)
+{
+    LogPrint("zmq", "Publish hash block %s\n", hash.GetHex());
+    char data[32];
+    for (unsigned int i = 0; i < 32; i++)
+        data[31 - i] = hash.begin()[i];
+    int rc = zmq_send_multipart(psocket, "hashblock", 9, data, 32, 0);
+    return rc == 0;
+}
+
+bool CZMQPublishHashTransactionNotifier::NotifyTransaction(const CTransaction &transaction)
+{
+    uint256 hash = transaction.GetHash();
+    LogPrint("zmq", "Publish hash transaction %s\n", hash.GetHex());
+    char data[32];
+    for (unsigned int i = 0; i < 32; i++)
+        data[31 - i] = hash.begin()[i];
+    int rc = zmq_send_multipart(psocket, "hashtx", 6, data, 32, 0);
+    return rc == 0;
+}
+
+bool CZMQPublishRawBlockNotifier::NotifyBlock(const uint256 &hash)
+{
+    LogPrint("zmq", "Publish raw block %s\n", hash.GetHex());
+
+    CDataStream ss(SER_NETWORK, PROTOCOL_VERSION);
+    {
+        LOCK(cs_main);
+
+        CBlock block;
+        CBlockIndex* pblockindex = mapBlockIndex[hash];
+
+        if(!ReadBlockFromDisk(block, pblockindex))
+        {
+            zmqError("Can't read block from disk");
+            return false;
+        }
+
+        ss << block;
+    }
+
+    int rc = zmq_send_multipart(psocket, "rawblock", 8, &(*ss.begin()), ss.size(), 0);
+    return rc == 0;
+}
+
+bool CZMQPublishRawTransactionNotifier::NotifyTransaction(const CTransaction &transaction)
+{
+    uint256 hash = transaction.GetHash();
+    LogPrint("zmq", "Publish raw transaction %s\n", hash.GetHex());
+    CDataStream ss(SER_NETWORK, PROTOCOL_VERSION);
+    ss << transaction;
+    int rc = zmq_send_multipart(psocket, "rawtx", 5, &(*ss.begin()), ss.size(), 0);
+    return rc == 0;
+}
diff --git a/src/zmq/zmqpublishnotifier.h b/src/zmq/zmqpublishnotifier.h
new file mode 100644 (file)
index 0000000..a0eb26f
--- /dev/null
@@ -0,0 +1,41 @@
+// Copyright (c) 2015 The Bitcoin Core developers
+// Distributed under the MIT software license, see the accompanying
+// file COPYING or http://www.opensource.org/licenses/mit-license.php.
+
+#ifndef BITCOIN_ZMQ_ZMQPUBLISHNOTIFIER_H
+#define BITCOIN_ZMQ_ZMQPUBLISHNOTIFIER_H
+
+#include "zmqabstractnotifier.h"
+
+class CZMQAbstractPublishNotifier : public CZMQAbstractNotifier
+{
+public:
+    bool Initialize(void *pcontext);
+    void Shutdown();
+};
+
+class CZMQPublishHashBlockNotifier : public CZMQAbstractPublishNotifier
+{
+public:
+    bool NotifyBlock(const uint256 &hash);
+};
+
+class CZMQPublishHashTransactionNotifier : public CZMQAbstractPublishNotifier
+{
+public:
+    bool NotifyTransaction(const CTransaction &transaction);
+};
+
+class CZMQPublishRawBlockNotifier : public CZMQAbstractPublishNotifier
+{
+public:
+    bool NotifyBlock(const uint256 &hash);
+};
+
+class CZMQPublishRawTransactionNotifier : public CZMQAbstractPublishNotifier
+{
+public:
+    bool NotifyTransaction(const CTransaction &transaction);
+};
+
+#endif // BITCOIN_ZMQ_ZMQPUBLISHNOTIFIER_H
This page took 0.061513 seconds and 4 git commands to generate.