Streaming Tweets to InfluxDB in Node.js

This week, I’ve been exploring the InfluxData tech stack. As a muse, I decided to move some of my social media sharing patterns formal algorithms. I also want to use my blog as a source for keywords, filter out profanity, and apply sentiment analysis to clarify relevant topics in the tweets.

Github repo for this article: github.com/paulsbruce/InfluxTwitterExample

What Is InfluxData?

Simply put, it’s a modern engine for metrics and events. You stream data in, then you have a whole host of options for real-time processing and analysis. Their platform diagram speaks volumes:

From InfluxData.com Telegraph Overview

Based on all open source components, the InfluxData platform has huge advantages over other competitors in terms of extensibility, language support, and its community. They have cloud and enterprise options when you need to scale your processing up too.

For now, I want to run stuff locally, so I went with the free sandbox environment. Again, completely open source stack bits, which is very cool of them as lots of their work ends up as OSS contributions into these bits.

Why Process Twitter Events  in InfluxDB?

Well, frankly, it’s an easy source for real-time data. I don’t have a 24/7 Jenkins instance or pay-for stream of enterprise data flowing in right now, but if I did, I would have started there because they already have a Jenkins data plugin. 🙂

But Twitter, just like every social media platform, is a firehose of semi-currated data. I want to share truly relevant information, not the rest of the garbage. To do this, I can pre-filter based on keywords from my blog and ‘friendlies’ that I’ve trusted enough to re-share in the past.

The point is not to automatically re-share content (which would be botty), but to queue up things in a buffer that would likely be something I would re-tweet. Then I can approve or reject these suggestions, which in turn can be a data stream to improve the machine learning algorithms that I will build as Kapacitor user-defined functions later on.

Streaming Data Into InfluxDB

There’s a huge list of existing, ready-to-go plugins for Telegraph, the collection agent. They’ve pretty much thought of everything, but I’m a hard-knocks kind of guy. I want to play with the InfluxDB APIs, so for my exploration I decided to write a standalone process in Node.js to insert data directly into InfluxDB.

To start, let’s declaring some basic structures in Node to work with InfluxDB:

  • dbname: the InfluxDB database to insert into
  • measure: the measurement (correlates to relational table) to store data with
  • fields: the specific instance data points to collect on every relevant Tweet
  • tags: an extensible list of topic-based keywords to associate with the data

Making Sure That the Database Is Created

Of course, we need to ensure that there’s a place and schema for our Twitter data points to land as they come in. That’s simple:

Saving Pre-screened Tweets as InfluxDB Data Points

Minus the plumbing of the Twitter API, inserting Tweets as data points to InfluxDB is also very easy. We simply need to match our internal data structure to than of the schema above:

Notice that the keywords (tags) can be a simple Javascript array of strings. I’m also optionally inserting the raw data for later analysis, but aggregating some of the most useful information for InfluxQL queries as fields.

The InfluxDB Node.js client respects ES6 Promises, as we can see with the ‘.catch’ handler. Huge help. This allows us to create robust promise chains with easy-to-read syntax. For more on Promises, read this article.

Verifying the Basic Data Stream

To see that the data is properly flowing in to the InfluxData platform, we can use Chronograf in a local sandbox environment and build some simple graphs:

To do this, we use the Graph Editor to write a basic InfluxQL statement:

The simple graph shows a flow of relevant tweets grouped by keyword so we can easily visualize as real-time data comes in.

A Few Ideas and Next Steps

Of the many benefits of processing data on the InfluxData platform, processing in Kapacitor seems to be one of the most interesting areas.

Moving forward I’d like to:

  1. Move Sentiment Analysis with Rosette from Node into Kapacitor
  2. Add Machine Learning into Kapacitor for
    A) clarifying relevance of keywords based on sentiment entity extraction
    B) extract information about the positivity / negativity of the tweet
  3. Catch high-relevance notifications and send to Buffer ‘For Review’ queue
    A) accepts and rejects factor back into machine learning algorithm
    B) follow-up statistics about re-shares further inform ML algorithm
  4. Have Kapacitor alert when:
    A) specific high-priority keywords are used (use ML based on my tweets)
    B) aggregate relevance for a given keyword spikes (hot topic)
    C) a non-tracked keyword/phrase is used in multiple relevant tweets
    (could be a related topic I should track, event hashtag, or something else)

As You Build Your Own, Reach Out!

I’m sure as I continue to implement some of these ideas, I’ll probably need help. Fortunately, Influx has a pretty active and helpful Community site. Everything from large exports, plugin development, and IoT gateways are discussed there. Jack Zampolin, David Simmons, and even CTO Paul Dix are just a few of the regular contributors to the conversation over there.

And as always, I like to help. As you work through your own exploration of InfluxData, feel free to reach out via Twitter or LinkedIn if you have comments, questions, or ideas.

Wrangling Promises in Node.js: 3 Misconceptions Resolved

ES6 (i.e. proper Javascript) isn’t the first time Promises were introduced, but formally supports them now. Believe me, you want to start using Promises in your Node.js scripts, but if you’re new to the pattern, it can be tricky to get your head around. That’s what I hope to help you do in the next 5 minutes.

What Are Javascript/ES6 Promises?

My way of explaining it is that Promises are chaining pattern, a convention that helps decouple your code blocks from your execution pattern. Promises can dramatically improve your approach to asynchronous programming (such as how Node.js 8+ prefers) and simplify your callbacks by helping you express them in a linear fashion.

from Promise (object) on MDN web docs

The really easy thing about them is that a Promise ends in either of two conditions:

  • Become fulfilled by a value
  • Become rejected with an exception

Consider the following code example:

In the above example, the ‘fetchJSON’ function returns a Promise, not the result of executing the request. Expressing things this way allows us to execute the code immediately, or as part of an asynchronous chain, such as:

What’s the alternative? Well…I hesitate to show you (the interweb loves to copy and paste) because we would have to:

  • express every asynchronous action as a callback function (which is bulky)
  • indent/embed blocks in a recursive step pattern
  • chain commands by calling the next function from our executing function

So far, I’ve made a career of learning how to stand up and say ‘I will not build that’. We should do that more often #facebook and you should read this.

The 3 Misconceptions You Want to Immediately Overcome

Amongst many I had while learning to use Promises, these are the top three I and most others often struggle to overcome:

  1. You can’t mix synchronous/callback-oriented code with Promise-based code
  2. It’s okay to ignore catching errors because it’s optional to the Promise chain
  3. There’s no way to join parallel executing paths of asynchronous Promises

I focus on these 3 misconceptions because when they’re not in your head, you can focus on the simplicity of Promise code. When writing, ask yourself: “is the code I just wrote elegant?” If the answer is no, chances are you’re getting hung up on a misconception.

Mixing Synchronous/Callback-Oriented Code with Promises

You CAN inject legacy synchronous code (code that doesn’t emit Promises), but you have to handle Promise-related tie-ins manually. The code example in the last section does exactly that with the ‘request’ function. However you DO have to wrap it in a function/lambda that eventually calls the ‘resolve’ or ‘reject’ handlers.

For instance, in a recent integration to Twitter, their ‘stream’ function does not return a Promise and only provides a callback mechanism, so I wrapped it to resolve or reject:

I decided to ‘Promisify’ this functionality because I wanted to wrap this logic in a Promise-based retry mechanism so that if the initial stream negotiation failed, it would only fail out of the entire script when multiple attempts failed. I opted to pull in the ‘promise-retry’ package from npmjs. Simplified the calling code dramatically:

Can you see how powerful Promises are now? Imagine how coupled the retry code would be with the stream initialization logic. Again, not going to show you what it looked like before for fear of the copy-n-paste police.

Don’t Ignore Error Catching Simply Because the Code Validates!

At first, as I was re-writing blocks of code to Promise-savvy statements, I was getting a lot of these errors:

The problem was that I didn’t have ‘.catch’ statements anywhere in the Promise chain. Node.js was interpreting the code as valid until runtime when the error occurred. Bad. Really bad of me. Glad that Node 8 was warning me.

You don’t have to write ‘.catch’ after every Promise, particularly if you’re returning Promises through functions, so long as the error is handled in at least one place up the Promise chain hierarchy. The Promise model provides you granularity on which errors you want to bubble up.

For instance, in the above code, I DON’T bubble up individual event/tweet errors, but I DO allow stream initialization errors to bubble up to the calling retry code. I can also selectively extend the individual stream event errors to become a bigger problem if the message back from twitter is something like ‘420 Enhance Your Calm’ which essentially means “back the fuck off, asshole”.

You CAN Join/Wait for Parallel Executing Promises

The Promise chain lets us string together as many sequential steps as we want via the ‘.then’ handler. But what about waiting for parallel threads of code?

Using the ‘Promise.all’ function, we can execute separate Promises asynchronously in parallel to each other, but wait in a parent async function by prefixing with the ‘await’ statement. For example:

Within an async function, the above code will wait for both Promises to complete before printing the final statement to the console.

I can tell, now you’re really getting the power of decoupling code expression from code execution. I told you that you’d want to start using Promises. As such, I suggest reading up on the links at the end of the article.

Hidden Lesson: Don’t Bury Your Head in the Sand!

My takeaway from all this learning is that I should have been applying lessons learned in my Java 7 work to other areas like Node.js. Promises isn’t a new idea (i.e. Java Futures, Async C#). If a pattern emerges in one language or framework, it’s very likely to already exist in others. If not, find people and contribute to the solution yourself.

If you run into issues, ping me up on Twitter or LinkedIn, and I’ll do my best to help in a timely manner.

More reading: