]> Git Repo - VerusCoin.git/blob - src/amqp/amqppublishnotifier.cpp
Build fix
[VerusCoin.git] / src / amqp / amqppublishnotifier.cpp
1 // Copyright (c) 2017 The Zcash developers
2 // Distributed under the MIT software license, see the accompanying
3 // file COPYING or https://www.opensource.org/licenses/mit-license.php .
4
5 #include "amqppublishnotifier.h"
6 #include "chainparams.h"
7 #include "main.h"
8 #include "util.h"
9
10 #include "amqpsender.h"
11
12 #include <memory>
13 #include <thread>
14
15 static std::multimap<std::string, AMQPAbstractPublishNotifier*> mapPublishNotifiers;
16
17 static const char *MSG_HASHBLOCK = "hashblock";
18 static const char *MSG_HASHTX    = "hashtx";
19 static const char *MSG_RAWBLOCK  = "rawblock";
20 static const char *MSG_RAWTX     = "rawtx";
21
22 // Invoke this method from a new thread to run the proton container event loop.
23 void AMQPAbstractPublishNotifier::SpawnProtonContainer()
24 {
25     try {
26         proton::default_container(*handler_).run();
27     }
28     catch (const proton::error_condition &e) {
29         LogPrint("amqp", "amqp: container error: %s\n", e.what());
30     }
31     catch (const std::runtime_error &e) {
32         LogPrint("amqp", "amqp: runtime error: %s\n", e.what());
33     }
34     catch (const std::exception &e) {
35         LogPrint("amqp", "amqp: exception: %s\n", e.what());
36     }
37     catch (...) {
38         LogPrint("amqp", "amqp: unknown error\n");
39     }
40     handler_->terminate();
41 }
42
43 bool AMQPAbstractPublishNotifier::Initialize()
44 {
45     std::multimap<std::string, AMQPAbstractPublishNotifier*>::iterator i = mapPublishNotifiers.find(address);
46
47     if (i == mapPublishNotifiers.end()) {
48         try {
49             handler_ = std::make_shared<AMQPSender>(address);
50             thread_ = std::make_shared<std::thread>(&AMQPAbstractPublishNotifier::SpawnProtonContainer, this);
51         }
52         catch (std::exception &e) {
53             LogPrint("amqp", "amqp: initialization error: %s\n", e.what());
54             return false;
55         }
56         mapPublishNotifiers.insert(std::make_pair(address, this));
57     } else {
58         // copy the shared ptrs to the message handler and the thread where the proton container is running
59         handler_ = i->second->handler_;
60         thread_ = i->second->thread_;
61         mapPublishNotifiers.insert(std::make_pair(address, this));
62     }
63
64     return true;
65 }
66
67
68 void AMQPAbstractPublishNotifier::Shutdown()
69 {
70     LogPrint("amqp", "amqp: Shutdown notifier %s at %s\n", GetType(), GetAddress());
71
72     int count = mapPublishNotifiers.count(address);
73
74     // remove this notifier from the list of publishers using this address
75     typedef std::multimap<std::string, AMQPAbstractPublishNotifier*>::iterator iterator;
76     std::pair<iterator, iterator> iterpair = mapPublishNotifiers.equal_range(address);
77
78     for (iterator it = iterpair.first; it != iterpair.second; ++it) {
79         if (it->second == this) {
80             mapPublishNotifiers.erase(it);
81             break;
82         }
83     }
84
85     // terminate the connection if this is the last publisher using this address
86     if (count == 1) {
87         handler_->terminate();
88         if (thread_.get() != nullptr) {
89             if (thread_->joinable()) {
90                 thread_->join();
91             }
92         }
93     }
94 }
95
96
97 bool AMQPAbstractPublishNotifier::SendMessage(const char *command, const void* data, size_t size)
98 {
99     try { 
100         proton::binary content;
101         const char *p = (const char *)data;
102         content.assign(p, p + size);
103
104         proton::message message(content);
105         message.subject(std::string(command));
106         proton::message::property_map & props = message.properties();
107         props.put("x-opt-sequence-number", sequence_);
108         handler_->publish(message);
109
110     } catch (proton::error_condition &e) {
111         LogPrint("amqp", "amqp: error : %s\n", e.what());
112         return false;
113     }
114     catch (const std::runtime_error &e) {
115         LogPrint("amqp", "amqp: runtime error: %s\n", e.what());
116         return false;
117     }
118     catch (const std::exception &e) {
119         LogPrint("amqp", "amqp: exception: %s\n", e.what());
120         return false;
121     }
122     catch (...) {
123         LogPrint("amqp", "amqp: unknown error\n");
124         return false;
125     }
126
127     sequence_++;
128
129     return true;
130 }
131
132 bool AMQPPublishHashBlockNotifier::NotifyBlock(const CBlockIndex *pindex)
133 {
134     uint256 hash = pindex->GetBlockHash();
135     LogPrint("amqp", "amqp: Publish hashblock %s\n", hash.GetHex());
136     char data[32];
137     for (unsigned int i = 0; i < 32; i++)
138         data[31 - i] = hash.begin()[i];
139     return SendMessage(MSG_HASHBLOCK, data, 32);
140 }
141
142 bool AMQPPublishHashTransactionNotifier::NotifyTransaction(const CTransaction &transaction)
143 {
144     uint256 hash = transaction.GetHash();
145     LogPrint("amqp", "amqp: Publish hashtx %s\n", hash.GetHex());
146     char data[32];
147     for (unsigned int i = 0; i < 32; i++)
148         data[31 - i] = hash.begin()[i];
149     return SendMessage(MSG_HASHTX, data, 32);
150 }
151
152 bool AMQPPublishRawBlockNotifier::NotifyBlock(const CBlockIndex *pindex)
153 {
154     LogPrint("amqp", "amqp: Publish rawblock %s\n", pindex->GetBlockHash().GetHex());
155
156     const Consensus::Params& consensusParams = Params().GetConsensus();
157     CDataStream ss(SER_NETWORK, PROTOCOL_VERSION);
158     {
159         LOCK(cs_main);
160         CBlock block;
161         if(!ReadBlockFromDisk(block, pindex, consensusParams)) {
162             LogPrint("amqp", "amqp: Can't read block from disk");
163             return false;
164         }
165
166         ss << block;
167     }
168
169     return SendMessage(MSG_RAWBLOCK, &(*ss.begin()), ss.size());
170 }
171
172 bool AMQPPublishRawTransactionNotifier::NotifyTransaction(const CTransaction &transaction)
173 {
174     uint256 hash = transaction.GetHash();
175     LogPrint("amqp", "amqp: Publish rawtx %s\n", hash.GetHex());
176     CDataStream ss(SER_NETWORK, PROTOCOL_VERSION);
177     ss << transaction;
178     return SendMessage(MSG_RAWTX, &(*ss.begin()), ss.size());
179 }
This page took 0.035438 seconds and 4 git commands to generate.