Analytics/Infrastructure/Logging Solutions Overview
This page is meant as a workspace to evaluate replacements for udp2log. The options discussed here will be used as the main data firehose into the Analytics Kraken Cluster. This is an overview meant to summarize the research that the Analytics team has done, and will be used to start a discussion with ops.
Currently the Wikimedia Foundation uses a custom logging daemon udp2log to transport logs from several different sources to 3 (as of July 2012) dedicated logging machines. Any host can send UDP sockets to a udp2log daemon. The udp2log daemon forks several 'filter' processes that are each responsible for filtering and transforming all incoming logs. Usually these logs are then saved to local log files, but they can also then be sent out over UDP again to another destination using log2udp.
While udp2log is a simple C program and generally works as it was designed, there are problems with continuing to use it. The most obvious is that it does not scale. udp2log daemons are mainly being used to collect web access logs for all Wikimedia sites. The traffic for these sites is so great that sampling on the incoming log streams needs to be done. None of the 3 current log machines has the capacity to write unsampled log streams to disk, let alone enough storage space to keep these logs. udp2log was not designed as a distributed solution to this problem.
The Analytics team is tasked with building a distributed analytics cluster that can intake and store unsampled logs from any number of sources, and then subsequently do stream and batch processing on this data. We could attempt to enhance udp2log so that it works in a more distributed fashion. However, this problem has already been solved by some very smart people, so we see no need to spend our own resources solving this problem. So so so! What can we use instead?
- udp2log processes must always be running or we lose data!
- Each udp2log receiver must receive and process the entirety of the webrequest log stream.
Distributed Logging Solutions 
The following sections provide an overview of the logging solutions we are considering.
Key Questions for All Systems 
- How do clients find out about servers? (Bad: static configuration. Good: zk, pub-sub.)
- What kind of configuration is there for routing messages? (ex. brokers route messages; point-to-point; multicast.)
Failure and Recovery 
- What happens when an aggregation endpoint fails? Does the system support failover for aggregation endpoints? What kind of configuration is there for local buffering?
- Does the system guarantee exactly once delivery, or at least once delivery?
- Does the system support trees of aggregation? If so, is this chaining DC-aware, or do we have to build the awareness into our design/config?
- Local logs should be configurably durable. What options are there for automatic file cleanup/deletion?
- If the system ensures delivery of queued messages via durability, using local log or buffer files, then log rotation is a must; rotation must be per-minute granular (though we’ll probably use a larger window, this is a reasonable minimal bucket).
- A generic term for a single piece of data, usually a log line. (A.K.A. event, or log.)
- This is usually the original source of a message. It can also be used to refer to any upstream source of messages, but for our purposes will be used to identify an original message source (e.g. squid, varnish, application, etc.). (A.K.A. source.)
- The final destination of a message. This can also refer to any downstream consumer in a chain, but in this document will refer to the final destination. (A.K.A. sink.)
- Any daemon process that takes part in the passing of messages within a given logging or message system. e.g. a Scribe server, a Flume collector, a Kafka broker.
- Durable messages are persisted in permanent storage and will survive server failure.
- A reliable system guarantees that each message will be delivered at least once.
Feature Comparison 
We've put together a table comparing features of quite a few options. Below is a summary of the ones we are seriously considering. See this spreadsheet for a more complete comparison.
|Purpose||Scalable and reliable file logging||HDFS logging||Distributed message queue / log aggregation / activity stream|
|Notes||+Twitter has done some work for remote discovery.||*Reliability level is configurable.||*Relies on OS file cache for persistance, so durability is dependent on filesystem.|
Kafka is basically a big distributed log buffer. Its purpose is somewhere between a distributed message queue (like RabbitMQ, Kesterl, ActiveMQ, etc.) and distributed logging (Flume, Scribe, Fluentd, etc.). It does not conform to the usual message queue contract, in that messages cannot be deleted (or marked for deletion) by consumers.
Data is stored simply as files and memory usage is left up to OS pagecache. This keeps message data out of JVM memory. The Kafka architects note that they modeled this after Varnish's pagecache-centric design.
Messages are stored on brokers in files and indexed by file byte offset. Messages are ordered by arrival timestamp as determined by the broker, and no secondary indexing (or contiguous log sequencing) is provided. This allows for easy sequential file access and efficient disk scans, even for large ranges. Files are sent to consumers over network via
sendfile() (provided by
FileChannel.transferTo in Java), which avoids extra data copying between spaces (kernel, disk, user, network, etc).
Tracking state of consumed messages is left up to the consumer, and is coordinated by ZooKeeper (or another persistent store, if you like). Load balancing and distributed message partitioning is also coordinated by ZooKeeper. There is no single master; all Kafka brokers are peers. This means that by default, all messages are randomly distributed amongst all ZooKeeper registered brokers. A producer may set the
broker.list property to manually choose a set of brokers to use, if necessary.
- Kafka runs on the JVM and keeping data in the heap of a garbage collected language isn't wise. There are a couple of reasons for this. One is the GC overhead of continually scanning your in-memory cache, the other is the object overhead (in java a hash table of small objects tends to be mostly overhead not data).
- Modern operating systems reserve all free memory as "pagecache". Basically contiguous chunks of memory that soaks up reads and writes to disk. The nice thing about this is that on a 32GB machine you get access to virtually all of that memory automatically without having to worry about the possibility of running out of memory and swapping.
- Unix has optimizations to allow you to directly write data in pagecache to a socket without any additional copying (aka sendfile). Any data sent on a socket has to cross the process/kernel memory boundary any way. This means if you keep data in your process, and need to deliver that data to multiple consumers you need to recopy it into kernel space, buffering on both sides, each time. This approach gets rid of all the buffering and copying and uses and single structure.
Kafka makes no reliability guarantees at the producer or broker (agent) level. Consumers are responsible (via ZooKeeper, not manually) for saving state about what has been consumed. Brokers always save buffered messages for a configurable amount of time. LinkedIn keeps a week of data on each broker. This means that if a consumer fails, and we notice it within a week, the consumer should be able to start back up and continue consuming messages from where it left off.
However, LinkedIn built an audit trail to track data loss. Each of their producers and consumers emit statistics for how many messages they have processed for a given topic in a given time period into a special Kafka topic for audit purpose. This topic then contains the sum of messages processed for each topic at each tier (producer and consumer in the hierarchy) for a given time window (e.g. 10 minutes). If the counts at each level match, then it is assumed no data loss has happened. Linked in states “We are able to run this pipeline with end-to-end correctness of 99.9999%. The overwhelming majority of data loss comes from the producer batching where hard kills or crashes of the application process can lead to dropping unsent messages.”
How to get data in 
- tail -f /var/log/squid/access.log | kafka-console-producer.sh --topic squid
- Already Kafka clients built for these languages: C++, C#, Go, PHP, Python, Ruby, so we can modify sources to send directly to Kafka (like we do for udp2log)
- publish / subscribe
- Efficient memory management (JVM memory usage low) by leveraging OS pagecache + sendfile().
- Clients written in many languages.
- Brokers do not keep state.
- Uses ZooKeeper for configuration and failover.
- Consumer parallelization
- No masters, all brokers are peers.
- Scala, so runs in JVM.
- No per message reliability. Have to monitor data loss.
- No built in 'data flows' like Flume. Have to set these up via subscibed consumers.
See Also 
- Kafka Design Document
- Why is Kafka faster than other Message Queues?
- Really good presentation from LinkedIn on Kafka's architecture.
- Kafka whitepaper
Scribe is a distributed pushed based logging service. It guarantees reliability and availability by using a tree hierarchy of scribe servers combined with local failover file buffers.
Scribe's message durability is provided by its
buffer store type. A
buffer store has a
<secondary> store, each of which can be any store type. Typically, if the Scribe host in question is an aggregator, the
<primary> store is a
network store that simply forwards messages over to the next Scribe server in the chain. The
<secondary> is then configured as a simple file store. If the
network store goes down and is unavailable, logs will be buffered in the
<secondary> store until the
<primary> comes back online. The buffered logs will then be re-read from disk and sent over to the
multi store tells Scribe to send its messages to two different stores. This allows for fanout to multiple endpoints, but only with two branches at a time. It is possible to set up each of the two branches as buffered stores, each using the configuration (
<secondary> local file buffer) described above.
network stores, Scribe essentially achieves message replication. If a one of the two
network stores in an immediate hierarchy explodes, the second will still continue receiving messages. In this setup, the downstream consumers need to be configured in such a way as to avoid consuming duplicate logs, but this is the only way to guarantee 100% message durability with Scribe.
How to get data in 
- Tail log files into scribe_cat or use scribe_tail. This has the disadvantage of (probably) being pretty inefficient.
- Modify source software to use scribe client code and log directly to scribe. This is similar to what we are doing now for udp2log.
- Simple configuration files
- Generally works well
- Uses thrift so clients are available for any language.
- No longer an active project, Facebook is phasing this out
- HDFS integration might be buggy
- Difficult to package
- Static configuration
- No publish / subscribe
- Routing topologies limited. Can only do two branch trees.
See Also 
- Is Scribe Still Maintained?
- Why did Facebook develop a new logging service?. In particular check out Sam Rash's answer and presentation on Calligraphus and Facebook's data freeway.
- What's Up Scribe? - Otto's blog post on Scribe packaging and summary of Calligraphus.
Flume is a distributed logging service built by Cloudera primarily as a reliable way of getting stream and log data into HDFS. Its pluggable architecture supports any consumer.
Data Flows 
Flume is configured via 'data flows'. “A data flow describes the way a single stream of data is transferred and processed from its point of generation to its eventual destination.” Data flows are a centralized high level way of expressing how data gets from producer machine to consumer machines. These flows are configured at the Flume Master, and do not need to be configured at each producer. Each producer needs to run a Flume agent. The Flume agents are then configured to read data from the producers into the data flow by the Flume Master.
Data flows are expressed in static config files, or given as commands to a Flume CLI or web GUI. In general, a flow is of the form
<node_name> : <source> | <sink>
Example data flow:
squidA : tail("/var/log/squid/access.log") | agentE2EChain("flumeA","flumeB"); squidB : tail("/var/log/squid/access.log") | agentE2EChain("flumeB","flumeA"); flumeA : collectorSource | collectorSink("hdfs://..."); flumeB : collectorSource | collectorSink("hdfs://...");
In this example, two squid nodes are configured to tail their access logs into an End to End agent chain with two node failover.
squidA's access logs will be sent to node
flumeA defines a
collectorSource as a source, which simply tells node
flumeA that it should act as a Flume Collector. The logs that arrive to
collectorSource will be written to HDFS.
flumeB work similarly. Notice that each of the
agentE2Echains specify two nodes. The additional nodes are failover nodes. If
flumeA goes down, then
squidA will be notified by the Squid Master (via Zookeeper), and will begin sending its logs to
flumeA comes back online.
Note that this setup explicitly specifies failover chains. Flume can take care of automatically configuring the failover chains, however there is a note in the documentation that says automatic failover chains do not work with multiple Flume Masters.
Flume's centralized configuration makes it easy to modify data flows on the fly, without having to change config files on producers.
Each data flow's reliability level is configurable.
Uses write-ahead-log to buffer event until final sink has ACKed receiving and storing the event. ACKs are handled through master nodes, rather than up through the chain. This guarantees that that a message will reach the end of a data flow at least once. However, if a sink somewhere blocks for some reason, and an upstream source times out waiting for an ACK, the upstream source will resend the message. This can cause duplicate messages to be stored. Since this requires Flume Master coordination, this is the most inefficient option.
Store on failure 
Works like scribe’s buffer store. Agent requires ACK from immediate downstream receiver. If downstream receiver is down, the agent stores data locally until downstream comes back up, or until a failover downstream is selected and event is ACKed. This also can cause duplicate messages for the same reason.
Best effort 
This mode sends a message to the receiver, without any acknowledgement. This mode is good only if you need high throughput but not high reliability.
How to get data in 
squidA : tail("/var/log/squid/access.log") | agentE2EChain("flumeA","flumeB");
CustomLog "|flume node_nowatch -1 -s -n apache -c \'apache:console|agentDFOSink(\"collector\");\'" common
- Centralized configuration
- Highly available (with multi Flume Master setup. )
- Backed by Cloudera, this has been the logs -> HDFS solution of choice.
- source agents can track logrotated files, HDFS sinks write file names with configurable granularity.
- Java, using Flume only would require running JVM on production servers (squids, varnish, etc.).
- A slow sink could cause a large backlog.
- Development of Flume has come to a stop, Flume-NG is the next release, and is not production ready.
- No publish/subscribe feature.
See Also 
- Flume User Guide.
- Flume Cookbook.
- Issues to be aware of when using Flume in production.
- blog post about production trouble with Flume. (This guy might not know what he was doing though.)
- publish/subscribe upcoming in Flume-NG
Local Message Buffering 
One of the main goals here is to reliably get all messages from producers to their final destination in Kraken (or elsewhere). The originating softwares (squid, varnish, nginx, apache, lucene, etc. etc.) will not have reliable write ahead log buffering built in. Many of these producers can only be configured to write directly to local log files. Some can be configured or modified to send to a logging agent instead.
The solution that requires the least modification to production nodes is to have these producers fire off their messages to the remote logging agents. This is how udp2log works now. Each production producer opens up a socket to udp2log daemons and sends messages. If a udp2log instance is down, that instance will miss messages from the producers. If we simply run a cluster of remote logging agents and push messages to them from producers, we will still have this problem, independent of the distributed logging solution we choose.
A way to solve this problem is to run a bufferable logging agent on each of the production source machines. Each of the producers would log to a localhost agent, without having to go over the network. The localhost agent would be configured to forward incoming messages to the remote logging cluster. If the downstream logging cluster is not available, the localhost agent will buffer logs locally. Once the downstream is available again, the localhost agent would notice and begin sending its locally buffered logs over to it.
Production producer machines are one of the most sensitive parts of this new system. They are responsible for serving all Wikimedia websites, and any changes made to them, especially the introduction of new software, has the potential to cause downtime. Ops is concerned with this possibility, especially relating to the JVM based solutions proposed here. The JVM can be a memory and resource hog if not configured properly. It is not clear that we have the JVM maintenance expertise to ensure that the production nodes will continue to function as normal alongside of a JVM running either Flume or Kafka.
Scribe might be a good way to allay these concerns. It has the downsides of being an unmaintained and relatively feature lacking product. But, it was originally created by Facebook as a simple and efficient distributed logger, and even better, it is C++, not Java.
Kafka could also be used here. Even though it does run in a JVM, it has been built to leverage OS memory optimizations, and might not have as much of an impact as a traditional Java application that relies on JVM garbage collection.
Do we really need it? 
That said, it might not be worthwhile to pursue this option. As far as we can tell, none of the big data users of these softwares are concerned with local message buffering. Since all of these logging solutions are distributed with automatic failover built in, the only reason we would possibly lose messages is if a large portion of logging machines go down, or if there are network failures. In either case, the log sources won't be able to send their messages to the log servers. During a network or catastrophic log server failure, messages will be lost. However, the likelihood of this happening is low, and running new daemon processes to deal with local message buffering on the log sources might be more risky than not. An alternative safety measure would be to have each log source also log locally to files with a short log rotation period. In the case of a catastrophic logging tier failure, we could manually replay missed logs from each log source's local log files.
The analytics team would like to recommend Kafka as our logging solution of choice. See the Request Logging page for a full summary.
- Log sources are configured/modified to log to Kafka using Kafka client code.
- Kafka broker (and Zookeeper) cluster nodes accept logs.
- Kafka Consumers within Kraken cluster consume logs and feed into bundler (Storm)