class Mixpanel::Consumer

A Consumer receives messages from a Mixpanel::Tracker, and sends them elsewhere- probably to Mixpanel's analytics services, but can also enqueue them for later processing, log them to a file, or do whatever else you might find useful.

You can provide your own consumer to your Mixpanel::Trackers, either by passing in an argument with a send! method when you construct the tracker, or just passing a block to Mixpanel::Tracker.new

tracker = Mixpanel::Tracker.new(YOUR_MIXPANEL_TOKEN) do |type, message|
    # type will be one of :event, :profile_update or :import
    @kestrel.set(ANALYTICS_QUEUE, [type, message].to_json)
end

You can also instantiate the library consumers yourself, and use them wherever you would like. For example, the working that consumes the above queue might work like this:

mixpanel = Mixpanel::Consumer
while true
    message_json = @kestrel.get(ANALYTICS_QUEUE)
    mixpanel.send!(*JSON.load(message_json))
end

Mixpanel::Consumer is the default consumer. It sends each message, as the message is recieved, directly to Mixpanel.

Public Class Methods

new(events_endpoint=nil, update_endpoint=nil, groups_endpoint=nil, import_endpoint=nil) click to toggle source

Create a Mixpanel::Consumer. If you provide endpoint arguments, they will be used instead of the default Mixpanel endpoints. This can be useful for proxying, debugging, or if you prefer not to use SSL for your events.

# File lib/mixpanel-ruby/consumer.rb, line 61
def initialize(events_endpoint=nil,
               update_endpoint=nil,
               groups_endpoint=nil,
               import_endpoint=nil)
  @events_endpoint = events_endpoint || 'https://api.mixpanel.com/track'
  @update_endpoint = update_endpoint || 'https://api.mixpanel.com/engage'
  @groups_endpoint = groups_endpoint || 'https://api.mixpanel.com/groups'
  @import_endpoint = import_endpoint || 'https://api.mixpanel.com/import'
end

Public Instance Methods

request(endpoint, form_data) click to toggle source

Request takes an endpoint HTTP or HTTPS url, and a Hash of data to post to that url. It should return a pair of

response code, response body

as the result of the response. Response code should be nil if the request never receives a response for some reason.

# File lib/mixpanel-ruby/consumer.rb, line 126
def request(endpoint, form_data)
  uri = URI(endpoint)
  request = Net::HTTP::Post.new(uri.request_uri)
  request.set_form_data(form_data)

  client = Net::HTTP.new(uri.host, uri.port)
  client.use_ssl = true
  client.open_timeout = 10
  client.continue_timeout = 10
  client.read_timeout = 10
  client.ssl_timeout = 10

  Mixpanel.with_http(client)

  response = client.request(request)
  [response.code, response.body]
end
send(type, message) click to toggle source

This method was deprecated in release 2.0.0, please use send! instead

# File lib/mixpanel-ruby/consumer.rb, line 114
def send(type, message)
    warn '[DEPRECATION] send has been deprecated as of release 2.0.0, please use send! instead'
    send!(type, message)
end
send!(type, message) click to toggle source

Send the given string message to Mixpanel. Type should be one of :event, :profile_update or :import, which will determine the endpoint.

#send! sends messages to Mixpanel immediately on each call. To reduce the overall bandwidth you use when communicating with Mixpanel, you can also use Mixpanel::BufferedConsumer

# File lib/mixpanel-ruby/consumer.rb, line 77
def send!(type, message)
  type = type.to_sym
  endpoint = {
    :event => @events_endpoint,
    :profile_update => @update_endpoint,
    :group_update => @groups_endpoint,
    :import => @import_endpoint,
  }[type]

  decoded_message = JSON.load(message)
  api_key = decoded_message["api_key"]
  data = Base64.encode64(decoded_message["data"].to_json).gsub("\n", '')

  form_data = {"data" => data, "verbose" => 1}
  form_data.merge!("api_key" => api_key) if api_key

  begin
    response_code, response_body = request(endpoint, form_data)
  rescue => e
    raise ConnectionError.new("Could not connect to Mixpanel, with error \"#{e.message}\".")
  end

  result = {}
  if response_code.to_i == 200
    begin
      result = JSON.parse(response_body.to_s)
    rescue JSON::JSONError
      raise ServerError.new("Could not interpret Mixpanel server response: '#{response_body}'")
    end
  end

  if result['status'] != 1
    raise ServerError.new("Could not write to Mixpanel, server responded with #{response_code} returning: '#{response_body}'")
  end
end