#include <bitcoin/bitcoin.hpp>
using namespace bc;
using std::placeholders::_1;
using std::placeholders::_2;
// Watches transactions. Keeps a view count per seen tx hash, and
// cleans up old expired tx hashes.
class tx_watch
: public async_strand
{
public:
tx_watch(threadpool& pool, time_t timeout=200);
// Push a seen tx hash. If this entry exists then the count
// will be incremented.
// Else create a new entry in our list.
void push(const hash_digest& tx_hash);
// Cleans up expired items. We could make this implicitly called
// by push() or display(), but single use methods with no side-effects
// is better code design.
void cleanup();
// Display transactions and their count. A better design would be to
// separate the view from the model and instead provide a method which
// fetches a copy of our list, but we keep it simple here.
void display();
private:
struct entry_count
{
hash_digest tx_hash;
size_t count;
// Timestamp of when transaction hash was first seen.
time_t timest;
};
typedef std::vector<entry_count> entry_list;
// The public methods push these methods to the threadpool to be
// executed and then return immediately.
// async_strand::queue() is a helper method which posts the work
// to the threadpool and serializes access.
// No 2 operations posted through the same async_strand using queue()
// will execute at the same time.
void do_push(const hash_digest& tx_hash);
void do_cleanup();
void do_display();
entry_list entries_;
// Time until an entry is ready to be removed.
time_t timeout_;
};
tx_watch::tx_watch(threadpool& pool, time_t timeout)
: async_strand(pool), timeout_(timeout)
{
}
void tx_watch::push(const hash_digest& tx_hash)
{
queue(std::bind(&tx_watch::do_push, this, tx_hash));
// Returns immediately.
}
void tx_watch::do_push(const hash_digest& tx_hash)
{
// If tx_hash is found then increment count...
bool is_found = false;
for (entry_count& entry: entries_)
if (entry.tx_hash == tx_hash)
{
++entry.count;
is_found = true;
}
// Else create a new entry with a count of 1.
if (!is_found)
entries_.push_back({tx_hash, 1, time(nullptr)});
}
void tx_watch::cleanup()
{
queue(std::bind(&tx_watch::do_cleanup, this));
}
void tx_watch::do_cleanup()
{
// Erase entries where timest is older than (now - timeout_) seconds.
time_t current_time = time(nullptr);
auto erase_pred =
[&](const entry_count& entry)
{
return (current_time - entry.timest) > timeout_;
};
auto erase_begin =
std::remove_if(entries_.begin(), entries_.end(), erase_pred);
// If we have old entries to delete then erase them.
if (erase_begin != entries_.end())
entries_.erase(erase_begin);
}
void tx_watch::display()
{
queue(std::bind(&tx_watch::do_display, this));
}
void tx_watch::do_display()
{
// Sort entries by count. Highest numbers at the top.
std::sort(entries_.begin(), entries_.end(),
[](const entry_count& entry_a, const entry_count& entry_b)
{
return entry_a.count > entry_b.count;
});
// Display the first 20 entries.
for (size_t i = 0; i < 20 && i < entries_.size(); ++i)
{
const entry_count& entry = entries_[i];
log_info() << entry.tx_hash << " " << entry.count;
}
}
// We don't have a database open, and aren't doing any critical file
// operations so we aren't worried about exiting suddenly.
void check_error(const std::error_code& ec)
{
if (!ec)
return;
log_fatal() << ec.message();
exit(-1);
}
// Needed for the C callback capturing the signals.
bool stopped = false;
void signal_handler(int sig)
{
log_info() << "Caught signal: " << sig;
stopped = true;
}
// Started protocol. Node discovery complete.
void handle_start(const std::error_code& ec)
{
check_error(ec);
log_debug() << "Started.";
}
void connection_started(const std::error_code& ec, channel_ptr node,
protocol& prot, tx_watch& watch);
void inventory_received(const std::error_code& ec, const inventory_type& inv,
channel_ptr node, tx_watch& watch);
void connection_started(const std::error_code& ec, channel_ptr node,
protocol& prot, tx_watch& watch)
{
if (ec)
{
log_warning() << "Couldn't start connection: " << ec.message();
return;
}
log_info() << "Connection established.";
// Subscribe to inventory packets.
node->subscribe_inventory(
std::bind(inventory_received, _1, _2, node, std::ref(watch)));
// Resubscribe to new nodes.
prot.subscribe_channel(
std::bind(connection_started, _1, _2, std::ref(prot), std::ref(watch)));
}
void inventory_received(const std::error_code& ec, const inventory_type& inv,
channel_ptr node, tx_watch& watch)
{
check_error(ec);
// Loop through inventory hashes.
for (const inventory_vector_type& ivec: inv.inventories)
{
// We're only interested in transactions. Discard everything else.
if (ivec.type != inventory_type_id::transaction)
continue;
watch.push(ivec.hash);
}
// Resubscribe to inventory packets.
node->subscribe_inventory(
std::bind(inventory_received, _1, _2, node, std::ref(watch)));
}
int main()
{
threadpool pool(4);
// Create dependencies for our protocol object.
hosts hst(pool);
handshake hs(pool);
network net(pool);
// protocol service.
protocol prot(pool, hst, hs, net);
// Perform node discovery if needed and then creating connections.
prot.start(handle_start);
// Our table tracking transaction counts.
tx_watch watch(pool, 200);
// Notify us of new connections.
prot.subscribe_channel(
std::bind(connection_started, _1, _2, std::ref(prot), std::ref(watch)));
// Catch C signals for stopping the program.
signal(SIGABRT, signal_handler);
signal(SIGTERM, signal_handler);
signal(SIGINT, signal_handler);
while (!stopped)
{
watch.cleanup();
watch.display();
sleep(10);
}
// Safely close down.
pool.stop();
pool.join();
return 0;
}