]> Git Repo - VerusCoin.git/blame - src/amqp/amqpnotificationinterface.cpp
Testnet fixes
[VerusCoin.git] / src / amqp / amqpnotificationinterface.cpp
CommitLineData
99eb947a
S
1// Copyright (c) 2017 The Zcash developers
2// Distributed under the MIT software license, see the accompanying
bc909a7a 3// file COPYING or https://www.opensource.org/licenses/mit-license.php .
99eb947a
S
4
5#include "amqpnotificationinterface.h"
6#include "amqppublishnotifier.h"
7
8#include "version.h"
9#include "main.h"
10#include "streams.h"
11#include "util.h"
12
13// AMQP 1.0 Support
14//
15// The boost::signals2 signals and slot system is thread safe, so CValidationInterface listeners
16// can be invoked from any thread.
17//
18// Currently signals are fired from main.cpp so the callbacks should be invoked on the same thread.
19// It should be safe to share objects responsible for sending, as they should not be run concurrently
20// across different threads.
21//
22// Developers should be mindful of where notifications are fired to avoid potential race conditions.
23// For example, different signals targeting the same address could be fired from different threads
24// in different parts of the system around the same time.
25//
26// Like the ZMQ notification interface, if a notifier fails to send a message, the notifier is shut down.
27//
28
29AMQPNotificationInterface::AMQPNotificationInterface()
30{
31}
32
33AMQPNotificationInterface::~AMQPNotificationInterface()
34{
35 Shutdown();
36
37 for (std::list<AMQPAbstractNotifier*>::iterator i = notifiers.begin(); i != notifiers.end(); ++i) {
38 delete *i;
39 }
40}
41
42AMQPNotificationInterface* AMQPNotificationInterface::CreateWithArguments(const std::map<std::string, std::string> &args)
43{
44 AMQPNotificationInterface* notificationInterface = nullptr;
45 std::map<std::string, AMQPNotifierFactory> factories;
46 std::list<AMQPAbstractNotifier*> notifiers;
47
48 factories["pubhashblock"] = AMQPAbstractNotifier::Create<AMQPPublishHashBlockNotifier>;
49 factories["pubhashtx"] = AMQPAbstractNotifier::Create<AMQPPublishHashTransactionNotifier>;
50 factories["pubrawblock"] = AMQPAbstractNotifier::Create<AMQPPublishRawBlockNotifier>;
51 factories["pubrawtx"] = AMQPAbstractNotifier::Create<AMQPPublishRawTransactionNotifier>;
52
53 for (std::map<std::string, AMQPNotifierFactory>::const_iterator i=factories.begin(); i!=factories.end(); ++i) {
54 std::map<std::string, std::string>::const_iterator j = args.find("-amqp" + i->first);
55 if (j!=args.end()) {
56 AMQPNotifierFactory factory = i->second;
57 std::string address = j->second;
58 AMQPAbstractNotifier *notifier = factory();
59 notifier->SetType(i->first);
60 notifier->SetAddress(address);
61 notifiers.push_back(notifier);
62 }
63 }
64
65 if (!notifiers.empty()) {
66 notificationInterface = new AMQPNotificationInterface();
67 notificationInterface->notifiers = notifiers;
68
69 if (!notificationInterface->Initialize()) {
70 delete notificationInterface;
71 notificationInterface = nullptr;
72 }
73 }
74
75 return notificationInterface;
76}
77
78// Called at startup to conditionally set up
79bool AMQPNotificationInterface::Initialize()
80{
81 LogPrint("amqp", "amqp: Initialize notification interface\n");
82
83 std::list<AMQPAbstractNotifier*>::iterator i = notifiers.begin();
84 for (; i != notifiers.end(); ++i) {
85 AMQPAbstractNotifier *notifier = *i;
86 if (notifier->Initialize()) {
87 LogPrint("amqp", "amqp: Notifier %s ready (address = %s)\n", notifier->GetType(), notifier->GetAddress());
88 } else {
89 LogPrint("amqp", "amqp: Notifier %s failed (address = %s)\n", notifier->GetType(), notifier->GetAddress());
90 break;
91 }
92 }
93
94 if (i != notifiers.end()) {
95 return false;
96 }
97
98 return true;
99}
100
101// Called during shutdown sequence
102void AMQPNotificationInterface::Shutdown()
103{
104 LogPrint("amqp", "amqp: Shutdown notification interface\n");
105
106 for (std::list<AMQPAbstractNotifier*>::iterator i = notifiers.begin(); i != notifiers.end(); ++i) {
107 AMQPAbstractNotifier *notifier = *i;
108 notifier->Shutdown();
109 }
110}
111
112void AMQPNotificationInterface::UpdatedBlockTip(const CBlockIndex *pindex)
113{
114 for (std::list<AMQPAbstractNotifier*>::iterator i = notifiers.begin(); i != notifiers.end(); ) {
115 AMQPAbstractNotifier *notifier = *i;
116 if (notifier->NotifyBlock(pindex)) {
117 i++;
118 } else {
119 notifier->Shutdown();
120 i = notifiers.erase(i);
121 }
122 }
123}
124
125void AMQPNotificationInterface::SyncTransaction(const CTransaction &tx, const CBlock *pblock)
126{
127 for (std::list<AMQPAbstractNotifier*>::iterator i = notifiers.begin(); i != notifiers.end(); ) {
128 AMQPAbstractNotifier *notifier = *i;
129 if (notifier->NotifyTransaction(tx)) {
130 i++;
131 } else {
132 notifier->Shutdown();
133 i = notifiers.erase(i);
134 }
135 }
136}
This page took 0.114122 seconds and 4 git commands to generate.