Dash Core  0.12.2.1
P2P Digital Currency
zmqpublishnotifier.cpp
Go to the documentation of this file.
1 // Copyright (c) 2015 The Bitcoin Core developers
2 // Distributed under the MIT software license, see the accompanying
3 // file COPYING or http://www.opensource.org/licenses/mit-license.php.
4 
5 #include "chainparams.h"
6 #include "streams.h"
7 #include "zmqpublishnotifier.h"
8 #include "validation.h"
9 #include "util.h"
10 
11 static std::multimap<std::string, CZMQAbstractPublishNotifier*> mapPublishNotifiers;
12 
13 static const char *MSG_HASHBLOCK = "hashblock";
14 static const char *MSG_HASHTX = "hashtx";
15 static const char *MSG_HASHTXLOCK = "hashtxlock";
16 static const char *MSG_RAWBLOCK = "rawblock";
17 static const char *MSG_RAWTX = "rawtx";
18 static const char *MSG_RAWTXLOCK = "rawtxlock";
19 
20 // Internal function to send multipart message
21 static int zmq_send_multipart(void *sock, const void* data, size_t size, ...)
22 {
23  va_list args;
24  va_start(args, size);
25 
26  while (1)
27  {
28  zmq_msg_t msg;
29 
30  int rc = zmq_msg_init_size(&msg, size);
31  if (rc != 0)
32  {
33  zmqError("Unable to initialize ZMQ msg");
34  return -1;
35  }
36 
37  void *buf = zmq_msg_data(&msg);
38  memcpy(buf, data, size);
39 
40  data = va_arg(args, const void*);
41 
42  rc = zmq_msg_send(&msg, sock, data ? ZMQ_SNDMORE : 0);
43  if (rc == -1)
44  {
45  zmqError("Unable to send ZMQ msg");
46  zmq_msg_close(&msg);
47  return -1;
48  }
49 
50  zmq_msg_close(&msg);
51 
52  if (!data)
53  break;
54 
55  size = va_arg(args, size_t);
56  }
57  return 0;
58 }
59 
61 {
62  assert(!psocket);
63 
64  // check if address is being used by other publish notifier
65  std::multimap<std::string, CZMQAbstractPublishNotifier*>::iterator i = mapPublishNotifiers.find(address);
66 
67  if (i==mapPublishNotifiers.end())
68  {
69  psocket = zmq_socket(pcontext, ZMQ_PUB);
70  if (!psocket)
71  {
72  zmqError("Failed to create socket");
73  return false;
74  }
75 
76  int rc = zmq_bind(psocket, address.c_str());
77  if (rc!=0)
78  {
79  zmqError("Failed to bind address");
80  zmq_close(psocket);
81  return false;
82  }
83 
84  // register this notifier for the address, so it can be reused for other publish notifier
85  mapPublishNotifiers.insert(std::make_pair(address, this));
86  return true;
87  }
88  else
89  {
90  LogPrint("zmq", "zmq: Reusing socket for address %s\n", address);
91 
92  psocket = i->second->psocket;
93  mapPublishNotifiers.insert(std::make_pair(address, this));
94 
95  return true;
96  }
97 }
98 
100 {
101  assert(psocket);
102 
103  int count = mapPublishNotifiers.count(address);
104 
105  // remove this notifier from the list of publishers using this address
106  typedef std::multimap<std::string, CZMQAbstractPublishNotifier*>::iterator iterator;
107  std::pair<iterator, iterator> iterpair = mapPublishNotifiers.equal_range(address);
108 
109  for (iterator it = iterpair.first; it != iterpair.second; ++it)
110  {
111  if (it->second==this)
112  {
113  mapPublishNotifiers.erase(it);
114  break;
115  }
116  }
117 
118  if (count == 1)
119  {
120  LogPrint("zmq", "Close socket at address %s\n", address);
121  int linger = 0;
122  zmq_setsockopt(psocket, ZMQ_LINGER, &linger, sizeof(linger));
123  zmq_close(psocket);
124  }
125 
126  psocket = 0;
127 }
128 
129 bool CZMQAbstractPublishNotifier::SendMessage(const char *command, const void* data, size_t size)
130 {
131  assert(psocket);
132 
133  /* send three parts, command & data & a LE 4byte sequence number */
134  unsigned char msgseq[sizeof(uint32_t)];
135  WriteLE32(&msgseq[0], nSequence);
136  int rc = zmq_send_multipart(psocket, command, strlen(command), data, size, msgseq, (size_t)sizeof(uint32_t), (void*)0);
137  if (rc == -1)
138  return false;
139 
140  /* increment memory only sequence number after sending */
141  nSequence++;
142 
143  return true;
144 }
145 
147 {
148  uint256 hash = pindex->GetBlockHash();
149  LogPrint("zmq", "zmq: Publish hashblock %s\n", hash.GetHex());
150  char data[32];
151  for (unsigned int i = 0; i < 32; i++)
152  data[31 - i] = hash.begin()[i];
153  return SendMessage(MSG_HASHBLOCK, data, 32);
154 }
155 
157 {
158  uint256 hash = transaction.GetHash();
159  LogPrint("zmq", "zmq: Publish hashtx %s\n", hash.GetHex());
160  char data[32];
161  for (unsigned int i = 0; i < 32; i++)
162  data[31 - i] = hash.begin()[i];
163  return SendMessage(MSG_HASHTX, data, 32);
164 }
165 
167 {
168  uint256 hash = transaction.GetHash();
169  LogPrint("zmq", "zmq: Publish hashtxlock %s\n", hash.GetHex());
170  char data[32];
171  for (unsigned int i = 0; i < 32; i++)
172  data[31 - i] = hash.begin()[i];
173  return SendMessage(MSG_HASHTXLOCK, data, 32);
174 }
175 
177 {
178  LogPrint("zmq", "zmq: Publish rawblock %s\n", pindex->GetBlockHash().GetHex());
179 
180  const Consensus::Params& consensusParams = Params().GetConsensus();
182  {
183  LOCK(cs_main);
184  CBlock block;
185  if(!ReadBlockFromDisk(block, pindex, consensusParams))
186  {
187  zmqError("Can't read block from disk");
188  return false;
189  }
190 
191  ss << block;
192  }
193 
194  return SendMessage(MSG_RAWBLOCK, &(*ss.begin()), ss.size());
195 }
196 
198 {
199  uint256 hash = transaction.GetHash();
200  LogPrint("zmq", "zmq: Publish rawtx %s\n", hash.GetHex());
202  ss << transaction;
203  return SendMessage(MSG_RAWTX, &(*ss.begin()), ss.size());
204 }
205 
207 {
208  uint256 hash = transaction.GetHash();
209  LogPrint("zmq", "zmq: Publish rawtxlock %s\n", hash.GetHex());
211  ss << transaction;
212  return SendMessage(MSG_RAWTXLOCK, &(*ss.begin()), ss.size());
213 }
static const char * MSG_HASHBLOCK
bool NotifyBlock(const CBlockIndex *pindex)
const Consensus::Params & GetConsensus() const
Definition: chainparams.h:55
static std::multimap< std::string, CZMQAbstractPublishNotifier * > mapPublishNotifiers
bool NotifyTransactionLock(const CTransaction &transaction)
CCriticalSection cs_main
Definition: validation.cpp:62
static void WriteLE32(unsigned char *ptr, uint32_t x)
Definition: common.h:36
static const char * MSG_HASHTX
unsigned char * begin()
Definition: uint256.h:55
static int LogPrint(const char *category, const char *format)
Definition: util.h:126
bool SendMessage(const char *command, const void *data, size_t size)
const_iterator begin() const
Definition: streams.h:118
#define LOCK(cs)
Definition: sync.h:168
static const char * MSG_HASHTXLOCK
static int zmq_send_multipart(void *sock, const void *data, size_t size,...)
bool NotifyBlock(const CBlockIndex *pindex)
bool NotifyTransaction(const CTransaction &transaction)
static const char * MSG_RAWTX
static const char * MSG_RAWBLOCK
const CChainParams & Params()
void * memcpy(void *a, const void *b, size_t c)
static const int PROTOCOL_VERSION
Definition: version.h:13
const uint256 & GetHash() const
Definition: transaction.h:262
bool NotifyTransaction(const CTransaction &transaction)
uint256 GetBlockHash() const
Definition: chain.h:218
static const char * MSG_RAWTXLOCK
std::string GetHex() const
Definition: uint256.cpp:21
static int count
Definition: tests.c:41
bool NotifyTransactionLock(const CTransaction &transaction)
bool Initialize(void *pcontext)
bool ReadBlockFromDisk(CBlock &block, const CDiskBlockPos &pos, const Consensus::Params &consensusParams)
size_type size() const
Definition: streams.h:122
Definition: block.h:73
void zmqError(const char *str)