]>
Commit | Line | Data |
---|---|---|
99eb947a S |
1 | // Copyright (c) 2017 The Zcash developers |
2 | // Distributed under the MIT software license, see the accompanying | |
bc909a7a | 3 | // file COPYING or https://www.opensource.org/licenses/mit-license.php . |
99eb947a S |
4 | |
5 | #include "amqpnotificationinterface.h" | |
6 | #include "amqppublishnotifier.h" | |
7 | ||
8 | #include "version.h" | |
9 | #include "main.h" | |
10 | #include "streams.h" | |
11 | #include "util.h" | |
12 | ||
13 | // AMQP 1.0 Support | |
14 | // | |
15 | // The boost::signals2 signals and slot system is thread safe, so CValidationInterface listeners | |
16 | // can be invoked from any thread. | |
17 | // | |
18 | // Currently signals are fired from main.cpp so the callbacks should be invoked on the same thread. | |
19 | // It should be safe to share objects responsible for sending, as they should not be run concurrently | |
20 | // across different threads. | |
21 | // | |
22 | // Developers should be mindful of where notifications are fired to avoid potential race conditions. | |
23 | // For example, different signals targeting the same address could be fired from different threads | |
24 | // in different parts of the system around the same time. | |
25 | // | |
26 | // Like the ZMQ notification interface, if a notifier fails to send a message, the notifier is shut down. | |
27 | // | |
28 | ||
29 | AMQPNotificationInterface::AMQPNotificationInterface() | |
30 | { | |
31 | } | |
32 | ||
33 | AMQPNotificationInterface::~AMQPNotificationInterface() | |
34 | { | |
35 | Shutdown(); | |
36 | ||
37 | for (std::list<AMQPAbstractNotifier*>::iterator i = notifiers.begin(); i != notifiers.end(); ++i) { | |
38 | delete *i; | |
39 | } | |
40 | } | |
41 | ||
42 | AMQPNotificationInterface* AMQPNotificationInterface::CreateWithArguments(const std::map<std::string, std::string> &args) | |
43 | { | |
44 | AMQPNotificationInterface* notificationInterface = nullptr; | |
45 | std::map<std::string, AMQPNotifierFactory> factories; | |
46 | std::list<AMQPAbstractNotifier*> notifiers; | |
47 | ||
48 | factories["pubhashblock"] = AMQPAbstractNotifier::Create<AMQPPublishHashBlockNotifier>; | |
49 | factories["pubhashtx"] = AMQPAbstractNotifier::Create<AMQPPublishHashTransactionNotifier>; | |
50 | factories["pubrawblock"] = AMQPAbstractNotifier::Create<AMQPPublishRawBlockNotifier>; | |
51 | factories["pubrawtx"] = AMQPAbstractNotifier::Create<AMQPPublishRawTransactionNotifier>; | |
52 | ||
53 | for (std::map<std::string, AMQPNotifierFactory>::const_iterator i=factories.begin(); i!=factories.end(); ++i) { | |
54 | std::map<std::string, std::string>::const_iterator j = args.find("-amqp" + i->first); | |
55 | if (j!=args.end()) { | |
56 | AMQPNotifierFactory factory = i->second; | |
57 | std::string address = j->second; | |
58 | AMQPAbstractNotifier *notifier = factory(); | |
59 | notifier->SetType(i->first); | |
60 | notifier->SetAddress(address); | |
61 | notifiers.push_back(notifier); | |
62 | } | |
63 | } | |
64 | ||
65 | if (!notifiers.empty()) { | |
66 | notificationInterface = new AMQPNotificationInterface(); | |
67 | notificationInterface->notifiers = notifiers; | |
68 | ||
69 | if (!notificationInterface->Initialize()) { | |
70 | delete notificationInterface; | |
71 | notificationInterface = nullptr; | |
72 | } | |
73 | } | |
74 | ||
75 | return notificationInterface; | |
76 | } | |
77 | ||
78 | // Called at startup to conditionally set up | |
79 | bool AMQPNotificationInterface::Initialize() | |
80 | { | |
81 | LogPrint("amqp", "amqp: Initialize notification interface\n"); | |
82 | ||
83 | std::list<AMQPAbstractNotifier*>::iterator i = notifiers.begin(); | |
84 | for (; i != notifiers.end(); ++i) { | |
85 | AMQPAbstractNotifier *notifier = *i; | |
86 | if (notifier->Initialize()) { | |
87 | LogPrint("amqp", "amqp: Notifier %s ready (address = %s)\n", notifier->GetType(), notifier->GetAddress()); | |
88 | } else { | |
89 | LogPrint("amqp", "amqp: Notifier %s failed (address = %s)\n", notifier->GetType(), notifier->GetAddress()); | |
90 | break; | |
91 | } | |
92 | } | |
93 | ||
94 | if (i != notifiers.end()) { | |
95 | return false; | |
96 | } | |
97 | ||
98 | return true; | |
99 | } | |
100 | ||
101 | // Called during shutdown sequence | |
102 | void AMQPNotificationInterface::Shutdown() | |
103 | { | |
104 | LogPrint("amqp", "amqp: Shutdown notification interface\n"); | |
105 | ||
106 | for (std::list<AMQPAbstractNotifier*>::iterator i = notifiers.begin(); i != notifiers.end(); ++i) { | |
107 | AMQPAbstractNotifier *notifier = *i; | |
108 | notifier->Shutdown(); | |
109 | } | |
110 | } | |
111 | ||
112 | void AMQPNotificationInterface::UpdatedBlockTip(const CBlockIndex *pindex) | |
113 | { | |
114 | for (std::list<AMQPAbstractNotifier*>::iterator i = notifiers.begin(); i != notifiers.end(); ) { | |
115 | AMQPAbstractNotifier *notifier = *i; | |
116 | if (notifier->NotifyBlock(pindex)) { | |
117 | i++; | |
118 | } else { | |
119 | notifier->Shutdown(); | |
120 | i = notifiers.erase(i); | |
121 | } | |
122 | } | |
123 | } | |
124 | ||
125 | void AMQPNotificationInterface::SyncTransaction(const CTransaction &tx, const CBlock *pblock) | |
126 | { | |
127 | for (std::list<AMQPAbstractNotifier*>::iterator i = notifiers.begin(); i != notifiers.end(); ) { | |
128 | AMQPAbstractNotifier *notifier = *i; | |
129 | if (notifier->NotifyTransaction(tx)) { | |
130 | i++; | |
131 | } else { | |
132 | notifier->Shutdown(); | |
133 | i = notifiers.erase(i); | |
134 | } | |
135 | } | |
136 | } |