examples/txrad.cppΒΆ

#include <bitcoin/bitcoin.hpp>
using namespace bc;

using std::placeholders::_1;
using std::placeholders::_2;

// Watches transactions. Keeps a view count per seen tx hash, and
// cleans up old expired tx hashes.
class tx_watch
  : public async_strand
{
public:
    tx_watch(threadpool& pool, time_t timeout=200);

    // Push a seen tx hash. If this entry exists then the count
    // will be incremented.
    // Else create a new entry in our list.
    void push(const hash_digest& tx_hash);
    // Cleans up expired items. We could make this implicitly called
    // by push() or display(), but single use methods with no side-effects
    // is better code design.
    void cleanup();
    // Display transactions and their count. A better design would be to
    // separate the view from the model and instead provide a method which
    // fetches a copy of our list, but we keep it simple here.
    void display();

private:
    struct entry_count
    {
        hash_digest tx_hash;
        size_t count;
        // Timestamp of when transaction hash was first seen.
        time_t timest;
    };

    typedef std::vector<entry_count> entry_list;

    // The public methods push these methods to the threadpool to be
    // executed and then return immediately.
    // async_strand::queue() is a helper method which posts the work
    // to the threadpool and serializes access.
    // No 2 operations posted through the same async_strand using queue()
    // will execute at the same time.
    void do_push(const hash_digest& tx_hash);
    void do_cleanup();
    void do_display();

    entry_list entries_;
    // Time until an entry is ready to be removed.
    time_t timeout_;
};

tx_watch::tx_watch(threadpool& pool, time_t timeout)
  : async_strand(pool), timeout_(timeout)
{
}

void tx_watch::push(const hash_digest& tx_hash)
{
    queue(std::bind(&tx_watch::do_push, this, tx_hash));
    // Returns immediately.
}
void tx_watch::do_push(const hash_digest& tx_hash)
{
    // If tx_hash is found then increment count...
    bool is_found = false;
    for (entry_count& entry: entries_)
        if (entry.tx_hash == tx_hash)
        {
            ++entry.count;
            is_found = true;
        }
    // Else create a new entry with a count of 1.
    if (!is_found)
        entries_.push_back({tx_hash, 1, time(nullptr)});
}

void tx_watch::cleanup()
{
    queue(std::bind(&tx_watch::do_cleanup, this));
}
void tx_watch::do_cleanup()
{
    // Erase entries where timest is older than (now - timeout_) seconds.
    time_t current_time = time(nullptr);
    auto erase_pred =
        [&](const entry_count& entry)
        {
            return (current_time - entry.timest) > timeout_;
        };
    auto erase_begin =
        std::remove_if(entries_.begin(), entries_.end(), erase_pred);
    // If we have old entries to delete then erase them.
    if (erase_begin != entries_.end())
        entries_.erase(erase_begin);
}

void tx_watch::display()
{
    queue(std::bind(&tx_watch::do_display, this));
}
void tx_watch::do_display()
{
    // Sort entries by count. Highest numbers at the top.
    std::sort(entries_.begin(), entries_.end(),
        [](const entry_count& entry_a, const entry_count& entry_b)
        {
            return entry_a.count > entry_b.count;
        });
    // Display the first 20 entries.
    for (size_t i = 0; i < 20 && i < entries_.size(); ++i)
    {
        const entry_count& entry = entries_[i];
        log_info() << entry.tx_hash << " " << entry.count;
    }
}

// We don't have a database open, and aren't doing any critical file
// operations so we aren't worried about exiting suddenly.
void check_error(const std::error_code& ec)
{
    if (!ec)
        return;
    log_fatal() << ec.message();
    exit(-1);
}

// Needed for the C callback capturing the signals.
bool stopped = false;
void signal_handler(int sig)
{
    log_info() << "Caught signal: " << sig;
    stopped = true;
}

// Started protocol. Node discovery complete.
void handle_start(const std::error_code& ec)
{
    check_error(ec);
    log_debug() << "Started.";
}

void connection_started(const std::error_code& ec, channel_ptr node,
    protocol& prot, tx_watch& watch);
void inventory_received(const std::error_code& ec, const inventory_type& inv,
    channel_ptr node, tx_watch& watch);

void connection_started(const std::error_code& ec, channel_ptr node,
    protocol& prot, tx_watch& watch)
{
    if (ec)
    {
        log_warning() << "Couldn't start connection: " << ec.message();
        return;
    }
    log_info() << "Connection established.";
    // Subscribe to inventory packets.
    node->subscribe_inventory(
        std::bind(inventory_received, _1, _2, node, std::ref(watch)));
    // Resubscribe to new nodes.
    prot.subscribe_channel(
        std::bind(connection_started, _1, _2, std::ref(prot), std::ref(watch)));
}

void inventory_received(const std::error_code& ec, const inventory_type& inv,
    channel_ptr node, tx_watch& watch)
{
    check_error(ec);
    // Loop through inventory hashes.
    for (const inventory_vector_type& ivec: inv.inventories)
    {
        // We're only interested in transactions. Discard everything else.
        if (ivec.type != inventory_type_id::transaction)
            continue;
        watch.push(ivec.hash);
    }
    // Resubscribe to inventory packets.
    node->subscribe_inventory(
        std::bind(inventory_received, _1, _2, node, std::ref(watch)));
}

int main()
{
    threadpool pool(4);
    // Create dependencies for our protocol object.
    hosts hst(pool);
    handshake hs(pool);
    network net(pool);
    // protocol service.
    protocol prot(pool, hst, hs, net);
    // Perform node discovery if needed and then creating connections.
    prot.start(handle_start);
    // Our table tracking transaction counts.
    tx_watch watch(pool, 200);
    // Notify us of new connections.
    prot.subscribe_channel(
        std::bind(connection_started, _1, _2, std::ref(prot), std::ref(watch)));
    // Catch C signals for stopping the program.
    signal(SIGABRT, signal_handler);
    signal(SIGTERM, signal_handler);
    signal(SIGINT, signal_handler);
    while (!stopped)
    {
        watch.cleanup();
        watch.display();
        sleep(10);
    }
    // Safely close down.
    pool.stop();
    pool.join();
    return 0;
}

Previous topic

examples/proto.cpp

Next topic

examples/fullnode.cpp

This Page