AI News, Machine Learning Netflix Style with Xavier Amatriain Recorded at:
- On Friday, June 8, 2018
- By Read More
Machine Learning Netflix Style with Xavier Amatriain Recorded at:
mean Hadoop is going to be helpful in one set of problems and of course at Netflix we do use Hadoop and we work on some of our solutions are based on using Hive or Pig Scripts that runs Hadoop, but one important thing to remember at Hadoop is that it provides solutions for sort of like Data Distribution problem or Distributed Data Computing and in an offline or batch mode setting, and that it’s just one part of the problem, which is an interesting one because some of your Big Data problems can be addressed that way and think about the kind of processes that you can run over night, I like to use the metaphor like, when your people are sleeping and you can crunch some numbers and run to Map Reduce job from Hadoop and the next day when they wake up you have something ready for them, that is a good thing to do on the Hadoop side of things.
If the user starts watching a movie or TV show, you know we have usually half an hour or two hours, you could be doing, you could update things, they don’t need to happen online in a few milliseconds, they could happen in a few minutes, but you can recompute and rebuild your models and you can recompute your Recommendations in a different way that would happen through sort of like the Big Data offline Hadoop pipeline that is going to be happening over night and it’s going to be Big Data crunching.
MapReduce is a programming model and an associated implementation for processing and generating big data sets with a parallel, distributed algorithm on a cluster. A
MapReduce program is composed of a map procedure (or method), which performs filtering and sorting (such as sorting students by first name into queues, one queue for each name), and a reduce method, which performs a summary operation (such as counting the number of students in each queue, yielding name frequencies).
The model is a specialization of the split-apply-combine strategy for data analysis. It is inspired by the map and reduce functions commonly used in functional programming, although their purpose in the MapReduce framework is not the same as in their original forms. The key contributions of the MapReduce framework are not the actual map and reduce functions (which, for example, resemble the 1995 Message Passing Interface standard's reduce and scatter operations), but the scalability and fault-tolerance achieved for a variety of applications by optimizing the execution engine.
MapReduce is a framework for processing parallelizable problems across large datasets using a large number of computers (nodes), collectively referred to as a cluster (if all nodes are on the same local network and use similar hardware) or a grid (if the nodes are shared across geographically and administratively distributed systems, and use more heterogenous hardware).
While this process can often appear inefficient compared to algorithms that are more sequential (because multiple instances of the reduction process must be run), MapReduce can be applied to significantly larger datasets than 'commodity' servers can handle – a large server farm can use MapReduce to sort a petabyte of data in only a few hours. The parallelism also offers some possibility of recovering from partial failure of servers or storage during the operation: if one mapper or reducer fails, the work can be rescheduled – assuming the input data is still available.
Another way to look at MapReduce is as a 5-step parallel and distributed computation: These five steps can be logically thought of as running in sequence – each step starts only after the previous step is completed – although in practice they can be interleaved as long as the final result is not affected.
The Reduce function is then applied in parallel to each group, which in turn produces a collection of values in the same domain: Reduce(k2, list (v2)) → list(v3) Each Reduce call typically produces either one value v3 or an empty return, though one call is allowed to return more than one value.
In SQL, such a query could be expressed as: Using MapReduce, the K1 key values could be the integers 1 through 1100, each representing a batch of 1 million records, the K2 key value could be a person's age in years, and this computation could be achieved using the following functions: The MapReduce system would line up the 1100 Map processors, and would provide each with its corresponding 1 million input records.
If we did not add the count of the records, the computed average would be wrong, for example: If we reduce files #1 and #2, we will have a new file with an average of 9 contacts for a 10-year-old person ((9+9+9+9+9)/5): If we reduce it with file #3, we lose the count of how many records we've already seen, so we end up with an average of 9.5 contacts for a 10-year-old person ((9+10)/2), which is wrong.
MapReduce is useful in a wide range of applications, including distributed pattern-based searching, distributed sorting, web link-graph reversal, Singular Value Decomposition, web access log stats, inverted index construction, document clustering, machine learning, and statistical machine translation.
Moreover, the MapReduce model has been adapted to several computing environments like multi-core and many-core systems, desktop grids, multi-cluster, volunteer computing environments, dynamic cloud environments, mobile environments, and high-performance computing environments. At Google, MapReduce was used to completely regenerate Google's index of the World Wide Web.
It replaced the old ad hoc programs that updated the index and ran the various analyses. Development at Google has since moved on to technologies such as Percolator, FlumeJava and MillWheel that offer streaming operation and updates instead of batch processing, to allow integrating 'live' search results without rebuilding the complete index. MapReduce's stable inputs and outputs are usually stored in a distributed file system.
David DeWitt and Michael Stonebraker, computer scientists specializing in parallel databases and shared-nothing architectures, have been critical of the breadth of problems that MapReduce can be used for. They called its interface too low-level and questioned whether it really represents the paradigm shift its proponents have claimed it is. They challenged the MapReduce proponents' claims of novelty, citing Teradata as an example of prior art that has existed for over two decades.
They also compared MapReduce programmers to CODASYL programmers, noting both are 'writing in a low-level language performing low-level record manipulation.' MapReduce's use of input files and lack of schema support prevents the performance improvements enabled by common database system features such as B-trees and hash partitioning, though projects such as Pig (or PigLatin), Sawzall, Apache Hive, YSmart, HBase and Bigtable are addressing some of these problems.
DeWitt and Stonebraker have subsequently published a detailed benchmark study in 2009 comparing performance of Hadoop's MapReduce and RDBMS approaches on several specific problems. They concluded that relational databases offer real advantages for many kinds of data use, especially on complex processing or where the data is used across an enterprise, but that MapReduce may be easier for users to adopt for simple or one-time processing tasks.
Big Data Explained
While many Big Data technologies are mature enough to be used for mission-critical, production use cases, it is still nascent in some regards.
Online Big Data refers to data that is created, ingested, trans- formed, managed and/or analyzed in real-time to support operational applications and their users.
This includes a vast array of applications, from social networking news feeds, to analytics to real-time ad servers to complex CRM applications.
There are three general types of licenses for Big Data software technologies: For many Fortune 1000 companies, regulations and internal policies around data privacy limit their ability to leverage cloud-based solutions.
The nation’s top engineers and data scientists often flock to companies like Google and Facebook, which are known havens for the brightest minds and places where one will be exposed to leading edge technology.
By offering developers the opportunity to work on tough problems, and by using a technology that has strong developer interest, a vibrant community, and an auspicious long-term future, organizations can attract the brightest minds.
Furthermore, technologies that have strong developer appeal tend to make for more productive teams who feel they are empowered by their tools rather than encumbered by poorly-designed, legacy technology.
In this context, agility comprises three primary components: MongoDB’s ease of use, dynamic data model and open- source licensing model make it the most agile online Big Data solution available.
Organizations are constantly trying to standardize on fewer technologies to reduce complexity, to improve their competency in the selected tools and to make their vendor relationships more productive.
Organizations may find that a niche technology may be a better fit for a single project, but that a more general purpose tool is the better option for the organization as a whole.
The Log: What every software engineer should know about real-time data's unifying abstraction
I joined LinkedIn about six years ago at a particularly interesting time.
We were just beginning to run up against the limits of our monolithic, centralized database and needed to start the transition to a portfolio of specialized distributed systems.
This has been an interesting experience: we built, deployed, and run to this day a distributed graph database, a distributed search backend, a Hadoop installation, and a first and second generation key-value store.
Sometimes called write-ahead logs or commit logs or transaction logs, logs have been around almost as long as computers and are at the heart of many distributed data systems and real-time application architectures.
You can't fully understand databases, NoSQL stores, key value stores, replication, paxos, hadoop, version control, or almost any software system without understanding logs;
In this post, I'll walk you through everything you need to know about logs, including what is log and how to use logs for data integration, real time processing, and system building.
A file is an array of bytes, a table is an array of records, and a log is really just a kind of table or file where the records are sorted by time.
Every programmer is familiar with another definition of logging—the unstructured error messages or trace info an application might write out to a local file using syslog or log4j.
This approach quickly becomes an unmanageable strategy when many services and servers are involved and the purpose of logs quickly becomes as an input to queries and graphs to understand behavior across many machines—something for which english text in files is not nearly as appropriate as the kind structured log described here.)
To make this atomic and durable, a database uses a log to write out information about the records they will be modifying, before applying the changes to all the various data structures it maintains.
Oracle has productized the log as a general data subscription mechanism for non-oracle data subscribers with their XStreams and GoldenGate and similar facilities in MySQL and PostgreSQL are key components of many data architectures.
For example a program whose output is influenced by the particular order of execution of threads or by a call to gettimeofday or some other non-repeatable thing is generally best considered as non-deterministic.
The purpose of the log here is to squeeze all the non-determinism out of the input stream to ensure that each replica processing this input stays in sync.
One of the beautiful things about this approach is that the time stamps that index the log now act as the clock for the state of the replicas—you can describe each replica by a single number, the timestamp for the maximum log entry it has processed.
For example, we can log the incoming requests to a service, or the state changes the service undergoes in response to request, or the transformation commands it executes.
Logical logging means logging not the changed rows but the SQL commands that lead to the row changes (the insert, update, and delete statements).
A slight modification of this, called the "primary-backup model", is to elect one replica as the leader and allow this leader to process requests in the order they arrive and log out the changes to its state from processing the requests.
With Paxos, this is usually done using an extension of the protocol called "multi-paxos", which models the log as a series of consensus problems, one for each slot in the log.
My suspicion is that our view of this is a little bit biased by the path of history, perhaps due to the few decades in which the theory of distributed computing outpaced its practical application.
I suspect we will end up focusing more on the log as a commoditized building block irrespective of its implementation in the same way we often talk about a hash table without bothering to get in the details of whether we mean the murmur hash with linear probing or some other variant.
There is a sense in which the log is the more fundamental data structure: in addition to creating the original table you can also transform it to create all kinds of derived tables.
The magic of the log is that if it is a complete log of changes, it holds not only the contents of the final version of the table, but also allows recreating all other versions that might have existed.
You will note that in version control systems, as in other distributed stateful systems, replication happens via the log: when you update, you pull down just the patches and apply them to your current snapshot.
In the remainder of this article I will try to give a flavor of what a log is good for that goes beyond the internals of distributed computing or abstract distributed computing models.
In each case, the usefulness of the log comes from simple function that the log provides: producing a persistent, re-playable record of history.
You don't hear much about data integration in all the breathless interest and hype around the idea of big data, but nonetheless, I believe this mundane problem of "making the data available"
The base of the pyramid involves capturing all the relevant data, being able to put it together in an applicable processing environment (be that a fancy real-time query system or just text files and python scripts).
Once these basic needs of capturing data in a uniform way are taken care of it is reasonable to work on infrastructure to process this data in various ways—MapReduce, real-time query systems, etc.
It's worth noting the obvious: without a reliable and complete data flow, a Hadoop cluster is little more than a very expensive and difficult to assemble space heater.
In my experience, most organizations have huge holes in the base of this pyramid—they lack reliable complete data flow—but want to jump directly to advanced data modeling techniques.
In web systems, this means user activity logging, but also the machine-level events and statistics required to reliably operate and monitor a data center's worth of machines.
This data is at the heart of the modern web: Google's fortune, after all, is generated by a relevance pipeline built on clicks and impressions—that is, events.
The second trend comes from the explosion of specialized data systems that have become popular and often freely available in the last five years.
A data source could be an application that logs out events (say clicks or page views), or a database table that accepts modifications.
A batch system such as Hadoop or a data warehouse may consume only hourly or daily, whereas a real-time query system may need to be up-to-the-second.
Neither the originating data source nor the log has knowledge of the various data destination systems, so consumer systems can be added and removed with no change in the pipeline.
The consumer system need not concern itself with whether the data came from an RDBMS, a new-fangled key-value store, or was generated without a real-time query system of any kind.
doesn't imply much more than indirect addressing of messages—if you compare any two messaging systems promising publish-subscribe, you find that they guarantee very different things, and most models are not useful in this domain.
That isn't the end of the story of mastering data flow: the rest of the story is around metadata, schemas, compatibility, and all the details of handling data structure and evolution.
One of the earliest pieces of infrastructure we developed was a service called databus that provided a log caching abstraction on top of our early Oracle tables to scale subscription to database changes so we could feed our social graph and search indexes.
Having little experience in this area, we naturally budgeted a few weeks for getting data in and out, and the rest of our time for implementing fancy prediction algorithms.
Worse, any time there was a problem in any of the pipelines, the Hadoop system was largely useless—running fancy algorithms on bad data just produces more bad data.
If we captured all the structure we needed, we could make Hadoop data loads fully automatic, so that no manual effort was expanded adding new data sources or handling schema changes—data would just magically appear in HDFS and Hive tables would automatically be generated for new data sources with the appropriate columns.
The idea is that adding a new data system—be it a data source or a data destination—should create integration work only to connect it to a single pipeline instead of each consumer of data.
This experience lead me to focus on building Kafka to combine what we had seen in messaging systems with the log concept popular in databases and distributed system internals.
For a long time, Kafka was a little unique (some would say odd) as an infrastructure product—neither a database nor a log file collection system nor a traditional messaging system.
The similarity goes right down to the way partitioning is handled, data is retained, and the fairly odd split in the Kafka API between high- and low-level consumers.
For those not in the know, the data warehousing methodology involves periodically extracting data from source databases, munging it into some kind of understandable form, and loading it into a central data warehouse.
A data warehouse is a piece of batch query infrastructure which is well suited to many kinds of reporting and ad hoc analysis, particularly when the queries involve simple counting, aggregation, and filtering.
But having a batch system be the only repository of clean complete data means the data is unavailable for systems requiring a real-time feed—real-time processing, search indexing, monitoring systems, etc.
The incentives are not aligned: data producers are often not very aware of the use of the data in the data warehouse and end up creating data that is hard to extract or requires heavy, hard to scale transformation to get into usable form.
Of course, the central team never quite manages to scale to match the pace of the rest of the organization, so data coverage is always spotty, data flow is fragile, and changes are slow.
This means that as part of their system design and implementation they must consider the problem of getting data out and into a well structured form for delivery to the central pipeline.
The data warehouse team handles only the simpler problem of loading structured feeds of data from the central log and carrying out transformation specific to their system.
This point about organizational scalability becomes particularly important when one considers adopting additional data systems beyond a traditional data warehouse.
Worse, the ETL processing pipeline built to support database loads is likely of no use for feeding these other systems, making bootstrapping these pieces of infrastructure as large an undertaking as adopting a data warehouse.
By contrast, if the organization had built out feeds of uniform, well-structured data, getting any new system full access to all data requires only a single bit of integration plumbing to attach to the pipeline.
The typical approach to activity data in the web industry is to log it out to text files where it can be scrapped into a data warehouse or into Hadoop for aggregation and querying.
Worse, the systems that we need to interface with are now somewhat intertwined—the person working on displaying jobs needs to know about many other systems and features and make sure they are integrated properly.
The job display page now just shows a job and records the fact that a job was shown along with the relevant attributes of the job, the viewer, and any other useful facts about the display of the job.
Each of the other interested systems—the recommendation system, the security system, the job poster analytics system, and the data warehouse—all just subscribe to the feed and do their processing.
Using a log as a universal integration mechanism is never going to be more than an elegant fantasy if we can't build a log that is fast, cheap, and scalable enough to make this practical at scale.
At LinkedIn we are currently running over 60 billion unique message writes through Kafka per day (several hundred billion if you count the writes from mirroring between datacenters).
Instead, the guarantees that we provide are that each partition is order preserving, and Kafka guarantees that appends to a particular partition from a single sender will be delivered in the order they are sent.
Batching occurs from client to server when sending data, in writes to disk, in replication between servers, in data transfer to consumers, and in acknowledging committed data.
The cumulative effect of these optimizations is that you can usually write and read data at the rate supported by the disk or network, even while maintaining data sets that vastly exceed memory.
If you are a fan of late 90s and early 2000s database literature or semi-successful data infrastructure products, you likely associate stream processing with efforts to build a SQL engine or "boxes and arrows"
There is no inherent reason you can't process the stream of data from yesterday or a month ago using a variety of different languages to express the computation.
Data collection at the time was inherently batch oriented, it involved riding around on horseback and writing down records on paper, then transporting this batch of records to a central location where humans added up all the counts.
These days, when you describe the census process one immediately wonders why we don't keep a journal of births and deaths and produce population counts either continuously or with whatever granularity is needed.
But as these processes are replaced with continuous feeds, one naturally starts to move towards continuous processing to smooth out the processing resources needed and reduce latency.
When data is collected in batches, it is almost always due to some manual step or lack of digitization or is a historical relic left over from the automation of some non-digital process.
Seen in this light, it is easy to have a different view of stream processing: it is just processing which includes a notion of time in the underlying data being processed and does not require a static snapshot of the data so it can produce output at a user-controlled frequency instead of waiting for the "end"
Companies building stream processing systems focused on providing processing engines to attach to real-time data streams, but it turned out that at the time very few people actually had real-time data streams.
Actually, very early at my career at LinkedIn, a company tried to sell us a very cool stream processing system, but since all our data was collected in hourly files at that time, the best application we could come up with was to pipe the hourly files into the stream system at the end of the hour!
The exception actually proves the rule here: finance, the one domain where stream processing has met with some success, was exactly the area where real-time data streams were already the norm and processing had become the bottleneck.
It turns out that the log solves some of the most critical technical problems in stream processing, which I'll describe, but the biggest problem that it solves is just making data available in real-time multi-subscriber data feeds.
The most interesting aspect of stream processing has nothing to do with the internals of a stream processing system, but instead has to do with how it extends our idea of what a data feed is from the earlier data integration discussion.
Indeed, using a centralized log in this fashion, you can view all the organization's data capture, transformation, and flow as just a series of logs and processes that write to them.
A stream processor need not have a fancy framework at all: it can be any process or set of processes that read and write from logs, but additional infrastructure and support can be provided for helping manage processing code.
If processing proceeds in an unsynchronized fashion it is likely to happen that an upstream data producing job will produce data more quickly than another downstream job can consume it.
One might, for example, want to enrich an event stream (say a stream of clicks) with information about the user doing the click—in effect joining the click stream to the user account database.
Invariably, this kind of processing ends up requiring some kind of state to be maintained by the processor: for example, when computing a count, you have the count so far to maintain.
This gives us exactly the tool to be able to convert streams to tables co-located with our processing, as well as a mechanism for handling fault tolerance for these tables.
This mechanism allows a generic mechanism for keeping co-partitioned state in arbitrary index types local with the incoming stream data.
For keyed data, though, a nice property of the complete log is that you can replay it to recreate the state of the source system (potentially recreating it in another system).
By doing this, we still guarantee that the log contains a complete backup of the source system, but now we can no longer recreate all previous states of the source system, only the more recent ones.
There is an analogy here between the role a log serves for data flow inside a distributed database and the role it serves for data integration in a larger organization.
But these issues can be addressed by a good system: it is possible for an organization to have a single Hadoop cluster, for example, that contains all the data and serves a large and diverse constituency.
So there is already one possible simplification in the handling of data that has become possible in the move to distributed systems: coalescing lots of little instances of each system into a few big clusters.
This is clearly not a story relevant to end-users who presumably care primarily more about the API then how it is implemented, but it might be a path towards getting the simplicity of the single system in a more diverse and modular world that continues to evolve.
If the implementation time for a distributed system goes from years to weeks because reliable, flexible building blocks emerge, then the pressure to coalesce into a single monolithic system disappears.
This is exactly the part that should vary from system to system: for example, a full-text search query may need to query all partitions whereas a query by primary key may only need to query a single node responsible for that key's data.
The serving nodes store whatever index is required to serve queries (for example a key-value store might have something like a btree or sstable, a search system would have an inverted index).
The client can get read-your-write semantics from any node by providing the timestamp of a write as part of its query—a serving node receiving such a query will compare the desired timestamp to its own index point and if necessary delay the request until it has indexed up to at least that time to avoid serving stale data.
I find this view of systems as factored into a log and query api to very revealing, as it lets you separate the query characteristics from the availability and consistency aspects of the system.
These systems feed off a database (using Databus as a log abstraction or off a dedicated log from Kafka) and provide a particular partitioning, indexing, and query capability on top of that data stream.
In fact, it is quite common to have a single data feed (whether a live feed or a derived feed coming from Hadoop) replicated into multiple serving systems for live serving.
None of these systems need to have an externally accessible write api at all, Kafka and databases are used as the system of record and changes flow to the appropriate query systems through that log.
Everyone seems to uses different terms for the same things so it is a bit of a puzzle to connect the database literature to the distributed systems stuff to the various enterprise software camps to the open source world.
- On Sunday, February 23, 2020
Big Ideas: Simplifying Big Data Loading
Watch all the Big Ideas videos at Well run companies take pride in their ability to have an accurate and current understanding of the "state ..
Open source data processing on Google Cloud Platform (Google Cloud Next '17)
The great power provided by open source data processing tools has often come with the burden of great responsibility. The open source data processing ...
What is Data Replication?
Businesses want real time updates to their data, but they don't want to tie up the application systems that create that data, because it slows down the ...
Serverless computing options with Google Cloud Platform (Google Cloud Next '17)
From Functions-as-a-Service to Backend-as-a-Service, even Big Data-as-a-Service, Serverless is taking many different shapes. Learn what these mean and ...
DATA & ANALYTICS - Data Processing & OSS: The NEXT Generation
Recorded on Mar 23 2016 at GCP NEXT 2016 in San Francisco. Open-source data tools allow you to process data in volumes not possible a few years ago, but ...
Big Business: Unlocking Value from Big Data with Analytics
Executives and data scientists from Baidu, LinkedIn, and Foursquare discuss how to generate real value from Big Data, and the importance of business leaders ...
Apache Beam: Portable and Parallel Data Processing (Google Cloud Next '17)
Apache Beam provides a portable standard for expressing robust, out-of-order data processing pipelines in a variety of languages across a variety of platforms.
Images & Video: The Killer Use Case for Cloud Storage
Storing media files has become a 'killer use case' for cloud storage. Learn how media files have unique requirements ideally met by the cloud, how Google ...
Processing Data At Scale - Real-Time Analytics with Apache Storm
This video is part of an online course, Real-Time Analytics with Apache Storm. Check out the course here:
Data Analytics Software- Visualize and Analyze Any Measurement Data
Focus on design and testing, not time-consuming data analysis. Learn More Keysight's N8844A data analytics web service ..