1 // Copyright (c) 2017 The Zcash developers
2 // Distributed under the MIT software license, see the accompanying
3 // file COPYING or https://www.opensource.org/licenses/mit-license.php .
5 #include "amqppublishnotifier.h"
6 #include "chainparams.h"
10 #include "amqpsender.h"
15 static std::multimap<std::string, AMQPAbstractPublishNotifier*> mapPublishNotifiers;
17 static const char *MSG_HASHBLOCK = "hashblock";
18 static const char *MSG_HASHTX = "hashtx";
19 static const char *MSG_RAWBLOCK = "rawblock";
20 static const char *MSG_RAWTX = "rawtx";
22 // Invoke this method from a new thread to run the proton container event loop.
23 void AMQPAbstractPublishNotifier::SpawnProtonContainer()
26 proton::default_container(*handler_).run();
28 catch (const proton::error_condition &e) {
29 LogPrint("amqp", "amqp: container error: %s\n", e.what());
31 catch (const std::runtime_error &e) {
32 LogPrint("amqp", "amqp: runtime error: %s\n", e.what());
34 catch (const std::exception &e) {
35 LogPrint("amqp", "amqp: exception: %s\n", e.what());
38 LogPrint("amqp", "amqp: unknown error\n");
40 handler_->terminate();
43 bool AMQPAbstractPublishNotifier::Initialize()
45 std::multimap<std::string, AMQPAbstractPublishNotifier*>::iterator i = mapPublishNotifiers.find(address);
47 if (i == mapPublishNotifiers.end()) {
49 handler_ = std::make_shared<AMQPSender>(address);
50 thread_ = std::make_shared<std::thread>(&AMQPAbstractPublishNotifier::SpawnProtonContainer, this);
52 catch (std::exception &e) {
53 LogPrint("amqp", "amqp: initialization error: %s\n", e.what());
56 mapPublishNotifiers.insert(std::make_pair(address, this));
58 // copy the shared ptrs to the message handler and the thread where the proton container is running
59 handler_ = i->second->handler_;
60 thread_ = i->second->thread_;
61 mapPublishNotifiers.insert(std::make_pair(address, this));
68 void AMQPAbstractPublishNotifier::Shutdown()
70 LogPrint("amqp", "amqp: Shutdown notifier %s at %s\n", GetType(), GetAddress());
72 int count = mapPublishNotifiers.count(address);
74 // remove this notifier from the list of publishers using this address
75 typedef std::multimap<std::string, AMQPAbstractPublishNotifier*>::iterator iterator;
76 std::pair<iterator, iterator> iterpair = mapPublishNotifiers.equal_range(address);
78 for (iterator it = iterpair.first; it != iterpair.second; ++it) {
79 if (it->second == this) {
80 mapPublishNotifiers.erase(it);
85 // terminate the connection if this is the last publisher using this address
87 handler_->terminate();
88 if (thread_.get() != nullptr) {
89 if (thread_->joinable()) {
97 bool AMQPAbstractPublishNotifier::SendMessage(const char *command, const void* data, size_t size)
100 proton::binary content;
101 const char *p = (const char *)data;
102 content.assign(p, p + size);
104 proton::message message(content);
105 message.subject(std::string(command));
106 proton::message::property_map & props = message.properties();
107 props.put("x-opt-sequence-number", sequence_);
108 handler_->publish(message);
110 } catch (proton::error_condition &e) {
111 LogPrint("amqp", "amqp: error : %s\n", e.what());
114 catch (const std::runtime_error &e) {
115 LogPrint("amqp", "amqp: runtime error: %s\n", e.what());
118 catch (const std::exception &e) {
119 LogPrint("amqp", "amqp: exception: %s\n", e.what());
123 LogPrint("amqp", "amqp: unknown error\n");
132 bool AMQPPublishHashBlockNotifier::NotifyBlock(const CBlockIndex *pindex)
134 uint256 hash = pindex->GetBlockHash();
135 LogPrint("amqp", "amqp: Publish hashblock %s\n", hash.GetHex());
137 for (unsigned int i = 0; i < 32; i++)
138 data[31 - i] = hash.begin()[i];
139 return SendMessage(MSG_HASHBLOCK, data, 32);
142 bool AMQPPublishHashTransactionNotifier::NotifyTransaction(const CTransaction &transaction)
144 uint256 hash = transaction.GetHash();
145 LogPrint("amqp", "amqp: Publish hashtx %s\n", hash.GetHex());
147 for (unsigned int i = 0; i < 32; i++)
148 data[31 - i] = hash.begin()[i];
149 return SendMessage(MSG_HASHTX, data, 32);
152 bool AMQPPublishRawBlockNotifier::NotifyBlock(const CBlockIndex *pindex)
154 LogPrint("amqp", "amqp: Publish rawblock %s\n", pindex->GetBlockHash().GetHex());
156 const Consensus::Params& consensusParams = Params().GetConsensus();
157 CDataStream ss(SER_NETWORK, PROTOCOL_VERSION);
161 if(!ReadBlockFromDisk(block, pindex, consensusParams)) {
162 LogPrint("amqp", "amqp: Can't read block from disk");
169 return SendMessage(MSG_RAWBLOCK, &(*ss.begin()), ss.size());
172 bool AMQPPublishRawTransactionNotifier::NotifyTransaction(const CTransaction &transaction)
174 uint256 hash = transaction.GetHash();
175 LogPrint("amqp", "amqp: Publish rawtx %s\n", hash.GetHex());
176 CDataStream ss(SER_NETWORK, PROTOCOL_VERSION);
178 return SendMessage(MSG_RAWTX, &(*ss.begin()), ss.size());