Shared posts

25 Dec 07:51

Analyze your Amazon CloudFront access logs at scale

by Steffen Grunwald
ps

save access logs to amazon S3

Many AWS customers are using Amazon CloudFront, a global content delivery network (CDN) service. It delivers websites, videos, and API operations to browsers and clients with low latency and high transfer speeds. Amazon CloudFront protects your backends from massive load or malicious requests by caching or a web application firewall. As a result, sometimes only a small fraction of all requests gets to your backends. You can configure Amazon CloudFront to store access logs with detailed information of every request to Amazon Simple Storage Service (S3). This lets you gain insight into your cache efficiency and learn how your customers are using your products.

A common choice to run standard SQL queries on your data in S3 is Amazon Athena. Queries analyze your data immediately without the prior setup of infrastructure or loading your data. You pay only for the queries that you run. Amazon Athena is ideal for quick, interactive querying. It supports complex analysis of your data, including large joins, unions, nested queries, and window functions.

This blog post shows you how you can restructure your Amazon CloudFront access logs storage to optimize the cost and performance for queries. It demonstrates common patterns that are also applicable to other sources of time series data.

Optimizing Amazon CloudFront access logs for Amazon Athena queries

There are two main aspects to optimize: cost and performance.

Cost should be low for both storage of your data and the queries. Access logs are stored in S3, which is billed by GB/ month. Thus, it makes sense to compress your data – especially when you want to keep your logs for a long time. Also cost incurs on queries. When you optimize the storage cost, usually the query cost follows. Access logs are delivered compressed by gzip and Amazon Athena can deal with compression. Amazon Athena is billed by the amount of compressed data scanned, so the benefits of compression are passed on to you as cost savings.

Queries further benefit from partitioning. Partitioning divides your table into parts and keeps the related data together based on column values. For time-based queries, you benefit from partitioning by year, month, day, and hour. In Amazon CloudFront access logs, this indicates the request time. Depending on your data and queries, you add further dimensions to partitions. For example, for access logs it could be the domain name that was requested. When querying your data, you specify filters based on the partition to make Amazon Athena scan less data.

Generally, performance improves by scanning less data. Conversion of your access logs to columnar formats reduces the data to scan significantly. Columnar formats retain all information but store values by column. This allows creation of dictionaries, and effective use of Run Length Encoding and other compression techniques. Amazon Athena can further optimize the amount of data to read, because it does not scan columns at all if a column is not used in a filter or the result of a query. Columnar formats also split a file into chunks and calculate metadata on file- and chunk level like the range (min/ max), count, or sum of values. If the metadata indicates that the file or chunk is not relevant for the query Amazon Athena skips it. In addition, if you know your queries and the information you are looking for, you can further aggregate your data (for example, by day) for improved performance of frequent queries.

This blog post focuses on two measures to restructure Amazon CloudFront access logs for optimization: partitioning and conversion to columnar formats. For more details on performance tuning read the blog post about the top 10 performance tuning tips for Amazon Athena.

This blog post describes the concepts of a solution and includes code excerpts for better illustration of the implementation. Visit the AWS Samples repository for a fully working implementation of the concepts. Launching the packaged sample application from the AWS Serverless Application Repository, you deploy it within minutes in one step:

Partitioning CloudFront Access Logs in S3

Amazon CloudFront delivers each access log file in CSV format to an S3 bucket of your choice. Its name adheres to the following format (for more information, see Configuring and Using Access Logs):

/optional-prefix/distribution-ID.YYYY-MM-DD-HH.unique-ID.gz

The file name includes the date and time of the period in which the requests occurred in Coordinated Universal time (UTC). Although you can specify an optional prefix for an Amazon CloudFront distribution, all access log files for a distribution are stored with the same prefix.

When you have a large amount of access log data, this makes it hard to only scan and process parts of it efficiently. Thus, you must partition your data. Most tools in the big data space (for example, the Apache Hadoop ecosystem, Amazon Athena, AWS Glue) can deal with partitioning using the Apache Hive style. A partition is a directory that is self-descriptive. The directory name not only reflects the value of a column but also the column name. For access logs this is a desirable structure:

/optional-prefix/year=YYYY/month=MM/day=DD/hour=HH/distribution-ID.YYYY-MM-DD-HH.unique-ID.gz

To generate this structure, the sample application initiates the processing of each file by an S3 event notification. As soon as Amazon CloudFront puts a new access log file to an S3 bucket, an event triggers the AWS Lambda function moveAccessLogs. This moves the file to a prefix corresponding to the filename. Technically, the move is a copy followed by deletion of the original file.

 

 

Migration of your Amazon CloudFront Access Logs

The deployment of the sample application contains a single S3 bucket called <StackName>-cf-access-logs. You can modify your existing Amazon CloudFront distribution configuration to deliver access logs to this bucket with the new/ log prefix. Files are moved to the canonical file structure for Amazon Athena partitioning as soon as they are put into the bucket.

To migrate all previous access log files, copy them manually to the new/ folder in the bucket. For example, you could copy the files by using the AWS Command Line Interface (AWS CLI). These files are treated the same way as the incoming files by Amazon CloudFront.

Load the Partitions and query your Access Logs

Before you can query the access logs in your bucket with Amazon Athena the AWS Glue Data Catalog needs metadata. On deployment, the sample application creates a table with the definition of the schema and the location. The new table is created by adding the partitioning information to the CREATE TABLE statement from the Amazon CloudFront documentation (mind the PARTITIONED BY clause):

CREATE EXTERNAL TABLE IF NOT EXISTS
    cf_access_logs.partitioned_gz (
         date DATE,
         time STRING,
         location STRING,
         bytes BIGINT,
         requestip STRING,
         method STRING,
         host STRING,
         uri STRING,
         status INT,
         referrer STRING,
         useragent STRING,
         querystring STRING,
         cookie STRING,
         resulttype STRING,
         requestid STRING,
         hostheader STRING,
         requestprotocol STRING,
         requestbytes BIGINT,
         timetaken FLOAT,
         xforwardedfor STRING,
         sslprotocol STRING,
         sslcipher STRING,
         responseresulttype STRING,
         httpversion STRING,
         filestatus STRING,
         encryptedfields INT 
)
PARTITIONED BY(
         year string,
         month string,
         day string,
         hour string )
ROW FORMAT DELIMITED FIELDS TERMINATED BY '\t'
LOCATION 's3://<StackName>-cf-access-logs/partitioned-gz/'
TBLPROPERTIES ( 'skip.header.line.count'='2');

You can load the partitions added so far by running the metastore check (msck) statement via the Amazon Athena query editor. It discovers the partition structure in S3 and adds partitions to the metastore.

msck repair table cf_access_logs.partitioned_gz

You are now ready for your first query on your data in the Amazon Athena query editor:

SELECT SUM(bytes) AS total_bytes
FROM cf_access_logs.partitioned_gz
WHERE year = '2017'
AND month = '10'
AND day = '01'
AND hour BETWEEN '00' AND '11';

This query does not specify the request date (called date in a previous example) column of the table but the columns used for partitioning. These columns are dependent on date but the table definition does not specify this relationship. When you specify only the request date column, Amazon Athena scans every file as there is no hint which files contain the relevant rows and which files do not. By specifying the partition columns, Amazon Athena scans only a small subset of the total amount of Amazon CloudFront access log files. This optimizes both the performance and the cost of your queries. You can add further columns to the WHERE clause, such as the time to further narrow down the results.

To save cost, consider narrowing the scope of partitions down to a minimum by also putting the partitioning columns into the WHERE clause. You validate the approach by observing the amount of data that was scanned in the query execution statistics for your queries. These statistics are also displayed in the Amazon Athena query editor after your statement has been run:

Adding Partitions continuously

As Amazon CloudFront continuously delivers new access log data for requests, new prefixes for partitions are created in S3. However, Amazon Athena only queries the files contained in the known partitions, i.e. partitions that have been added before to the metastore. That’s why periodically triggering the msck command would not be the best solution. First, it is a time-consuming operation since Amazon Athena scans all S3 paths to validate and load your partitions. More importantly, this way you only add partitions that already have data delivered. Thus, there is some time period when the data exists in S3 but is not visible to Amazon Athena queries yet.

The sample application solves this by adding the partition for each hour in advance because partitions are just dependent on the request time. This way Amazon Athena scans files as soon as they exist in S3. A scheduled AWS Lambda function runs a statement like this:

ALTER TABLE cf_access_logs.partitioned_gz
ADD IF NOT EXISTS 
PARTITION (
    year = '2017',
    month = '10',
    day = '01',
    hour = '02' );

It can omit the specification of the canonical location attribute in this statement as it is automatically derived from the column values.

Conversion of the Access Logs to a Columnar Format

As mentioned previously, with columnar formats Amazon Athena skips scanning of data not relevant for a query resulting in less cost. Amazon Athena currently supports the columnar formats Apache ORC and Apache Parquet.

Key to the conversion is the Amazon Athena CREATE TABLE AS SELECT (CTAS) feature. A CTAS query creates a new table from the results of another SELECT query. Amazon Athena stores data files created by the CTAS statement in a specified location in Amazon S3. You can use CTAS to aggregate or transform the data, and to convert it into columnar formats. The sample application uses CTAS to hourly rewrite all logs from the CSV format to the Apache Parquet format. After this the resulting data will be added to a single partitioned table (the target table).

Creating the Target Table in Apache Parquet Format

The target table is a slightly modified version of the partitioned_gz table. Besides a different location the following table shows the different Serializer/Deserializer (SerDe) configuration for Apache Parquet:

CREATE EXTERNAL TABLE `cf_access_logs.partitioned_parquet`(
  `date` date, 
  `time` string, 
  `location` string, 
  `bytes` bigint, 
  `requestip` string, 
  `method` string, 
  `host` string, 
  `uri` string, 
  `status` int, 
  `referrer` string, 
  `useragent` string, 
  `querystring` string, 
  `cookie` string, 
  `resulttype` string, 
  `requestid` string, 
  `hostheader` string, 
  `requestprotocol` string, 
  `requestbytes` bigint, 
  `timetaken` float, 
  `xforwardedfor` string, 
  `sslprotocol` string, 
  `sslcipher` string, 
  `responseresulttype` string, 
  `httpversion` string, 
  `filestatus` string, 
  `encryptedfields` int)
PARTITIONED BY ( 
  `year` string, 
  `month` string, 
  `day` string, 
  `hour` string)
ROW FORMAT SERDE 
  'org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe' 
STORED AS INPUTFORMAT 
  'org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat' 
OUTPUTFORMAT 
  'org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat'
LOCATION
  's3://<StackName>-cf-access-logs/partitioned-parquet'
TBLPROPERTIES (
  'has_encrypted_data'='false', 
  'parquet.compression'='SNAPPY')

Transformation to Apache Parquet by the CTAS Query

The sample application provides a scheduled AWS Lambda function transformPartition that runs a CTAS query on a single partition per run, taking one hour of data into account. The target location for the Apache Parquet files is the Apache Hive style path in the location of the partitioned_parquet table.

 

 

The files written to S3 are important but the table in the AWS Glue Data Catalog for this data is just a by-product. Hence the function drops the CTAS table immediately and create the corresponding partition in the partitioned_parquet table instead.

CREATE TABLE cf_access_logs.ctas_2017_10_01_02
WITH ( format='PARQUET',
    external_location='s3://<StackName>-cf-access-logs/partitioned_parquet/year=2017/month=10/day=01/hour=02',
    parquet_compression = 'SNAPPY')
AS SELECT *
FROM cf_access_logs.partitioned_gz
WHERE year = '2017'
    AND month = '10'
    AND day = '01'
    AND hour = '02';

DROP TABLE cf_access_logs.ctas_2017_10_01_02;

ALTER TABLE cf_access_logs.partitioned_parquet
ADD IF NOT EXISTS 
PARTITION (
    year = '2017',
    month = '10',
    day = '01',
    hour = '02' );

The statement should be run as soon as new data is written. Amazon CloudFront usually delivers the log file for a time period to your Amazon S3 bucket within an hour of the events that appear in the log. The sample application schedules the transformPartition function hourly to transform the data for the hour before the previous hour.

Some or all log file entries for a time period can sometimes be delayed by up to 24 hours. If you must mitigate this case, you delete and recreate a partition after that period. Also if you migrated partitions from previous Amazon CloudFront access logs, run the transformPartition function for each partition. The sample applications only transforms continuously added files.

When all files of a gzip partition are converted to Apache Parquet, you can save cost by getting rid of data that you do not need. Use the Lifecycle Policies in S3 to archive the gzip files in a cheaper storage class or delete them after a specific amount of days.

Query data over Multiple Tables

You now have two derived tables from the original Amazon CloudFront access log data:

  • partitioned_gz contains gzip compressed CSV files that are added as soon as new files are delivered.
  • Access logs in partitioned_parquet are written after one hour latest. A rough assumption is that the CTAS query takes a maximum of 15 minutes to transform a gzip partition. You must measure and confirm this assumption. Depending on the data size, this can be much faster.

The following diagram shows how the complete view on all data is composed of the two tables. The last complete partition of Apache Parquet files ends before the current time minus the transformation duration and the duration until Amazon CloudFront delivers the access log files.

For convenience the sample application creates the Amazon Athena view combined as a union of both tables. It includes an additional column called file. This is the file that stores the row.

CREATE OR REPLACE VIEW cf_access_logs.combined AS
SELECT *, "$path" AS file
FROM cf_access_logs.partitioned_gz
WHERE concat(year, month, day, hour) >=
       date_format(date_trunc('hour', (current_timestamp -
       INTERVAL '15' MINUTE - INTERVAL '1' HOUR)), '%Y%m%d%H')
UNION ALL SELECT *, "$path" AS file
FROM cf_access_logs.partitioned_parquet
WHERE concat(year, month, day, hour) <
       date_format(date_trunc('hour', (current_timestamp -
       INTERVAL '15' MINUTE - INTERVAL '1' HOUR)), '%Y%m%d%H')

Now you can query the data from the view to take advantage of the columnar based file partitions automatically. As mentioned before, you should add the partition columns (year, month, day, hour) to your statement to limit the files Amazon Athena scans.

SELECT SUM(bytes) AS total_bytes
FROM cf_access_logs.combined
WHERE year = '2017'
   AND month = '10'
   AND day = '01'

Summary

In this blog post, you learned how to optimize the cost and performance of your Amazon Athena queries with two steps. First, you divide the overall data into small partitions. This allows queries to run much faster by reducing the number of files to scan. The second step converts each partition into a columnar format to reduce storage cost and increase the efficiency of scans by Amazon Athena.

The results of both steps are combined in a single view for convenient interactive queries by you or your application. All data is partitioned by the time of the request. Thus, this format is best suited for interactive drill-downs into your logs for which the columns are limited and the time range is known. This way, it complements the Amazon CloudFront reports, for example, by providing easy access to:

  • Data from more than 60 days in the past
  • The distribution of detailed HTTP status codes (for example, 200, 403, 404) on a certain day or hour
  • Statistics based on the URI paths
  • Statistics of objects that are not listed in Amazon CloudFront’s 50 most popular objects report
  • A drill down into the attributes of each request

We hope you find this blog post and the sample application useful also for other types of time series data beside Amazon CloudFront access logs. Feel free to submit enhancements to the example application in the source repository or provide feedback in the comments.

 


About the Author

Steffen Grunwald is a senior solutions architect with Amazon Web Services. Supporting German enterprise customers on their journey to the cloud, he loves to dive deep into application architectures and development processes to drive performance, operational efficiency, and increase the speed of innovation.

 

 

 

 

23 Nov 04:39

Well-Architected Lens: Focus on Specific Workload Types

by Philip Fitzsimons

Customers have been building their innovations on AWS for over 11 years. During that time, our solutions architects have conducted tens of thousands of architecture reviews for our customers. In 2012 we created the “Well-Architected” initiative to share with you best practices for building in the cloud, and started publishing them in 2015. We recently released an updated Framework whitepaper, and a new Operational Excellence Pillar whitepaper to reflect what we learned from working with customers every day. Today, we are pleased to announce a new concept called a “lens” that allows you to focus on specific workload types from the well-architected perspective.

A well-architected review looks at a workload from a general technology perspective, which means it can’t provide workload-specific advice. For example, there are additional best practices when you are building high-performance computing (HPC) or serverless applications. Therefore, we created the concept of a lens to focus on what is different for those types of workloads.

In each lens, we document common scenarios we see — specific to that workload — providing reference architectures and a walkthrough. The lens also provides design principles to help you understand how to architect these types of workloads for the cloud, and questions for assessing your own architecture.

Today, we are releasing two lenses:

Well-Architected: High-Performance Computing (HPC) Lens <new>
Well-Architected: Serverless Applications Lens <new>

We expect to create more lenses over time, and evolve them based on customer feedback.

Philip Fitzsimons, Leader, AWS Well-Architected Team

18 Dec 12:39

Searx – A privacy-respecting, hackable metasearch engine

14 Dec 11:39

C语言中关于时间的函数

by 中二饼

一.概念 

在C/C++中,通过学习许多C/C++库,你可以有很多操作、使用时间的方法。但在这之前你需要了解一些“时间”和“日期”的概念,主要有以下几个:

1. 协调世界时,又称为世界标准时间,也就是大家所熟知的格林威治标准时间(Greenwich Mean Time,GMT)。比如,中国内地的时间与UTC的时差为+8,也就是UTC+8。美国是UTC-5。

2. 日历时间,是用“从一个标准时间点到此时的时间经过的秒数”来表示的时间。这个标准时间点对不同的编译器来说会有所不同,但对一个编译系统来说,这个标准时间点是不变的,该编译系统中的时间对应的日历时间都通过该标准时间点来衡量,所以可以说日历时间是“相对时间”,但是无论你在哪一个时区,在同一时刻对同一个标准时间点来说,日历时间都是一样的。

3. 时间点。时间点在标准C/C++中是一个整数,它用此时的时间和标准时间点相差的秒数(即日历时间)来表示。

4. 时钟计时单元(而不把它叫做时钟滴答次数),一个时钟计时单元的时间长短是由CPU控制的。一个clock tick不是CPU的一个时钟周期,而是C/C++的一个基本计时单位。

我们可以使用ANSI标准库中的time.h头文件。这个头文件中定义的时间和日期所使用的方法,无论是在结构定义,还是命名,都具有明显的C语言风格。下面,我将说明在C/C++中怎样使用日期的时间功能。

二. 介绍

1. 计时

C/C++中的计时函数是clock(),而与其相关的数据类型是clock_t。在MSDN中,查得对clock函数定义如下:

clock_t clock( void );

这个函数返回从“开启这个程序进程”到“程序中调用clock()函数”时之间的CPU时钟计时单元(clock tick)数,在MSDN中称之为挂钟时间(wal-clock)。其中clock_t是用来保存时间的数据类型,在time.h文件中,我们可以找到对它的定义:

#ifndef _CLOCK_T_DEFINED 
typedef long clock_t; 
#define _CLOCK_T_DEFINED 
#endif

很明显,clock_t是一个长整形数。在time.h文件中,还定义了一个常量CLOCKS_PER_SEC,它用来表示一秒钟会有多少个时钟计时单元,其定义如下:

#define CLOCKS_PER_SEC ((clock_t)1000)

可以看到每过千分之一秒(1毫秒),调用clock()函数返回的值就加1。下面举个例子,你可以使用公式clock()/CLOCKS_PER_SEC来计算一个进程自身的运行时间:

void elapsed_time() 
{ 
printf("Elapsed time:%u secs.\n",clock()/CLOCKS_PER_SEC); 
}

当然,你也可以用clock函数来计算你的机器运行一个循环或者处理其它事件到底花了多少时间:

/* 测量一个事件持续的时间*/ 
/* Date : 10/24/2007 */
#include "stdio.h" 
#include "stdlib.h" 
#include "time.h" 
int main( void ) 
{ 
    long i = 10000000L; 
    clock_t start, finish; 
    double duration; 
    /* 测量一个事件持续的时间*/ 
    printf( "Time to do %ld empty loops is ", i ); 
    start = clock(); 
    while( i-- ) ; 
    finish = clock(); 
    duration = (double)(finish - start) / CLOCKS_PER_SEC; 
    printf( "%f seconds\n", duration ); 
    system("pause"); 
}

在笔者的机器上,运行结果如下:

Time to do 10000000 empty loops is 0.03000 seconds

上面我们看到时钟计时单元的长度为1毫秒,那么计时的精度也为1毫秒,那么我们可不可以通过改变CLOCKS_PER_SEC的定义,通过把它定义的大一些,从而使计时精度更高呢?通过尝试,你会发现这样是不行的。在标准C/C++中,最小的计时单位是一毫秒。

2.与日期和时间相关的数据结构

在标准C/C++中,我们可通过tm结构来获得日期和时间,tm结构在time.h中的定义如下:

#ifndef _TM_DEFINED 
struct tm { 
int tm_sec; /* 秒 – 取值区间为[0,59] */ 
int tm_min; /* 分 - 取值区间为[0,59] */ 
int tm_hour; /* 时 - 取值区间为[0,23] */ 
int tm_mday; /* 一个月中的日期 - 取值区间为[1,31] */ 
int tm_mon; /* 月份(从一月开始,0代表一月) - 取值区间为[0,11] */ 
int tm_year; /* 年份,其值等于实际年份减去1900 */ 
int tm_wday; /* 星期 – 取值区间为[0,6],其中0代表星期天,1代表星期一,以此类推 */ 
int tm_yday; /* 从每年的1月1日开始的天数 – 取值区间为[0,365],其中0代表1月1日,1代表1月2日,以此类推 */ 
int tm_isdst; /* 夏令时标识符,实行夏令时的时候,tm_isdst为正。不实行夏令时的进候,tm_isdst为0;不了解情况时,tm_isdst()为负。*/ 
}; 
#define _TM_DEFINED 
#endif

ANSI C标准称使用tm结构的这种时间表示为分解时间(broken-down time)。

而日历时间(Calendar Time)是通过time_t数据类型来表示的,用time_t表示的时间(日历时间)是从一个时间点(例如:1970年1月1日0时0分0秒)到此时的秒数。在time.h中,我们也可以看到time_t是一个长整型数:

#ifndef _TIME_T_DEFINED 
typedef long time_t; /* 时间值 */ 
#define _TIME_T_DEFINED /* 避免重复定义 time_t */ 
#endif

大家可能会产生疑问:既然time_t实际上是长整型,到未来的某一天,从一个时间点(一般是1970年1月1日0时0分0秒)到那时的秒数(即日历时间)超出了长整形所能表示的数的范围怎么办?对time_t数据类型的值来说,它所表示的时间不能晚于2038年1月18日19时14分07秒。为了能够表示更久远的时间,一些编译器厂商引入了64位甚至更长的整形数来保存日历时间。比如微软在Visual C++中采用了__time64_t数据类型来保存日历时间,并通过_time64()函数来获得日历时间(而不是通过使用32位字的time()函数),这样就可以通过该数据类型保存3001年1月1日0时0分0秒(不包括该时间点)之前的时间。

在time.h头文件中,我们还可以看到一些函数,它们都是以time_t为参数类型或返回值类型的函数:

double difftime(time_t time1, time_t time0); 
time_t mktime(struct tm * timeptr); 
time_t time(time_t * timer); 
char * asctime(const struct tm * timeptr); 
char * ctime(const time_t *timer);

此外,time.h还提供了两种不同的函数将日历时间(一个用time_t表示的整数)转换为我们平时看到的把年月日时分秒分开显示的时间格式tm:

struct tm * gmtime(const time_t *timer); 
struct tm * localtime(const time_t * timer);

通过查阅MSDN,我们可以知道Microsoft C/C++ 7.0中时间点的值(time_t对象的值)是从1899年12月31日0时0分0秒到该时间点所经过的秒数,而其它各种版本的Microsoft C/C++和所有不同版本的Visual C++都是计算的从1970年1月1日0时0分0秒到该时间点所经过的秒数。

3.与日期和时间相关的函数及应用 

在本节,我将向大家展示怎样利用time.h中声明的函数对时间进行操作。这些操作包括取当前时间、计算时间间隔、以不同的形式显示时间等内容。

4.  获得日历时间

我们可以通过time()函数来获得日历时间(Calendar Time),其原型为:

time_t time(time_t * timer);

如果你已经声明了参数timer,你可以从参数timer返回现在的日历时间,同时也可以通过返回值返回现在的日历时间,即从一个时间点(例如:1970年1月1日0时0分0秒)到现在此时的秒数。如果参数为空(NUL),函数将只通过返回值返回现在的日历时间,比如下面这个例子用来显示当前的日历时间:

运行的结果与当时的时间有关,我当时运行的结果是:

/* Date : 10/24/2007 */
/* Author: Eman Lee  */
#include "stdio.h" 
#include "stdlib.h" 
#include "time.h" 

int main(void) 
{ 
    time_t lt; 
    lt =time(NULL); 
    printf("The Calendar Time now is %d\n",lt); 
    return 0; 
}

The Calendar Time now is 1122707619

其中1122707619就是我运行程序时的日历时间。即从1970-01-01 08:00:00到此时的秒数。

5.  获得日期和时间

这里说的日期和时间就是我们平时所说的年、月、日、时、分、秒等信息。从第2节我们已经知道这些信息都保存在一个名为tm的结构体中,那么如何将一个日历时间保存为一个tm结构的对象呢?

其中可以使用的函数是gmtime()和localtime(),这两个函数的原型为:

struct tm * gmtime(const time_t *timer); 
struct tm * localtime(const time_t * timer);

其中gmtime()函数是将日历时间转化为世界标准时间(即格林尼治时间),并返回一个tm结构体来保存这个时间,而localtime()函数是将日历时间转化为本地时间。比如现在用gmtime()函数获得的世界标准时间是2005年7月30日7点18分20秒,那么我用localtime()函数在中国地区获得的本地时间会比世界标准时间晚8个小时,即2005年7月30日15点18分20秒。下面是个例子:

//本地时间,世界标准时间
/* Date : 10/24/2007 */
/* Author: Eman Lee  */
#include "stdio.h" 
#include "stdlib.h" 
#include "time.h" 

int main(void) 
{ 
    struct tm *local; 
    time_t t; 
    t=time(NULL); 
    local=localtime(&t); 
    printf("Local hour is: %d:%d:%d\n",local->tm_hour,local->tm_min,local->tm_sec); 
    local=gmtime(&t); 
    printf("UTC hour is:  %d:%d:%d\n",local->tm_hour,local->tm_min,local->tm_sec); 
    return 0; 
}

运行结果是:

Local hour is: 23:17:47

UTC hour is: 15:17:47

6.  固定的时间格式

我们可以通过asctime()函数和ctime()函数将时间以固定的格式显示出来,两者的返回值都是char*型的字符串。返回的时间格式为:

星期几 月份 日期 时:分:秒 年\n\0
例如:Wed Jan 02 02:03:55 1980\n\0

其中\n是一个换行符,\0是一个空字符,表示字符串结束。下面是两个函数的原型:

char * asctime(const struct tm * timeptr); 
char * ctime(const time_t *timer);

其中asctime()函数是通过tm结构来生成具有固定格式的保存时间信息的字符串,而ctime()是通过日历时间来生成时间字符串。这样的话,asctime()函数只是把tm结构对象中的各个域填到时间字符串的相应位置就行了,而ctime()函数需要先参照本地的时间设置,把日历时间转化为本地时间,然后再生成格式化后的字符串。在下面,如果t是一个非空的time_t变量的话,那么:

printf(ctime(&t));

等价于:

struct tm *ptr; 
ptr=localtime(&t); 
printf(asctime(ptr));

那么,下面这个程序的两条printf语句输出的结果就是不同的了(除非你将本地时区设为世界标准时间所在的时区):

//本地时间,世界标准时间
/* Date : 10/24/2007 */
/* Author: Eman Lee  */
#include "stdio.h" 
#include "stdlib.h" 
#include "time.h" 

int main(void) 
{ 
    struct tm *ptr; 
    time_t lt; 
    lt =time(NULL); 
    ptr=gmtime(&lt); 
    printf(asctime(ptr)); 
    printf(ctime(&lt)); 
    return 0; 
}

运行结果:

Sat Jul 30 08:43:03 2005
Sat Jul 30 16:43:03 2005

7.  自定义时间格式

我们可以使用strftime()函数将时间格式化为我们想要的格式。它的原型如下:

size_t strftime( 
char *strDest, 
size_t maxsize, 
const char *format, 
const struct tm *timeptr 
);

我们可以根据format指向字符串中格式命令把timeptr中保存的时间信息放在strDest指向的字符串中,最多向strDest中存放maxsize个字符。该函数返回向strDest指向的字符串中放置的字符数。

函数strftime()的操作有些类似于sprintf():识别以百分号(%)开始的格式命令集合,格式化输出结果放在一个字符串中。格式化命令说明串strDest中各种日期和时间信息的确切表示方法。格式串中的其他字符原样放进串中。格式命令列在下面,它们是区分大小写的。

%a 星期几的简写
%A 星期几的全称
%b 月分的简写
%B 月份的全称
%c 标准的日期的时间串
%C 年份的后两位数字
%d 十进制表示的每月的第几天
%D 月/天/年
%e 在两字符域中,十进制表示的每月的第几天
%F 年-月-日
%g 年份的后两位数字,使用基于周的年
%G 年分,使用基于周的年
%h 简写的月份名
%H 24小时制的小时
%I 12小时制的小时
%j 十进制表示的每年的第几天
%m 十进制表示的月份
%M 十时制表示的分钟数
%n 新行符
%p 本地的AM或PM的等价显示
%r 12小时的时间
%R 显示小时和分钟:hh:mm
%S 十进制的秒数
%t 水平制表符
%T 显示时分秒:hh:mm:ss
%u 每周的第几天,星期一为第一天 (值从0到6,星期一为0)
%U 第年的第几周,把星期日做为第一天(值从0到53)
%V 每年的第几周,使用基于周的年
%w 十进制表示的星期几(值从0到6,星期天为0)
%W 每年的第几周,把星期一做为第一天(值从0到53)
%x 标准的日期串
%X 标准的时间串
%y 不带世纪的十进制年份(值从0到99)
%Y 带世纪部分的十进制年份
%z,%Z 时区名称,如果不能得到时区名称则返回空字符。
%% 百分号

如果想显示现在是几点了,并以12小时制显示,就象下面这段程序:

//显示现在是几点了,并以12小时制显示
/* Date : 10/24/2007 */
/* Author: Eman Lee  */
#include "stdio.h" 
#include "stdlib.h" 
#include "time.h" 

int main(void) 
{ 
    struct tm *ptr; 
    time_t localTime; 
    char str[80]; 
    localTime=time(NULL); 
    ptr=localtime(&localTime); 
    strftime(str,100,"It is now %I %p\n",ptr); 
    printf(str); 
    return 0; 
}

其运行结果为:

It is now 4PM

而下面的程序则显示当前的完整日期:

//显示当前的完整日期
/* Date : 10/24/2007 */
/* Author: Eman Lee  */
#include "stdio.h" 
#include "stdlib.h" 
#include "time.h" 

void main( void ) 
{ 
    struct tm *newtime; 
    char tmpbuf[128]; 
    time_t localTime1; 
    time( &localTime1 ); 
    newtime=localtime(&localTime1); 
    strftime( tmpbuf, 128, "Today is %A, day %d of %B in the year %Y.\n", newtime); 
    printf(tmpbuf); 
}

运行结果:

Today is Saturday, day 30 of July in the year 2005.

8.  计算持续时间的长度

有时候在实际应用中要计算一个事件持续的时间长度,比如计算打字速度。在第1节计时部分中,我已经用clock函数举了一个例子。Clock()函数可以精确到毫秒级。同时,我们也可以使用difftime()函数,但它只能精确到秒。该函数的定义如下:

double difftime(time_t time1, time_t time0);

虽然该函数返回的以秒计算的时间间隔是double类型的,但这并不说明该时间具有同double一样的精确度,这是由它的参数觉得的(time_t是以秒为单位计算的)。比如下面一段程序:

//计算持续时间的长度 
/* Date : 10/24/2007 */
/* Author: Eman Lee  */
#include "stdio.h" 
#include "stdlib.h" 
#include "time.h" 
int main(void) 
{ 
    time_t start,end; 
    start = time(NULL); 
    system("pause"); 
    end = time(NULL); 
    printf("The pause used %f seconds.\n",difftime(end,start));//<- 
    system("pause"); 
    return 0; 
}

运行结果为:

请按任意键继续. . .
The pause used 2.000000 seconds.
请按任意键继续. . .

可以想像,暂停的时间并不那么巧是整整2秒钟。其实,你将上面程序的带有“//<-”注释的一行用下面的一行代码替换:

printf("The pause used %f seconds.\n",end-start);

其运行结果是一样的。

9.  分解时间转化为日历时间

这里说的分解时间就是以年、月、日、时、分、秒等分量保存的时间结构,在C/C++中是tm结构。我们可以使用mktime()函数将用tm结构表示的时间转化为日历时间。其函数原型如下:

time_t mktime(struct tm * timeptr);

其返回值就是转化后的日历时间。这样我们就可以先制定一个分解时间,然后对这个时间进行操作了,下面的例子可以计算出1997年7月1日是星期几:

//计算出1997年7月1日是星期几 
/* Date : 10/24/2007 */
/* Author: Eman Lee  */
#include "stdio.h" 
#include "stdlib.h" 
#include "time.h" 
int main(void) 
{ 
    struct tm time; 
    time_t t_of_day; 
    time.tm_year=1997-1900; 
    time.tm_mon=6; 
    time.tm_mday=1; 
    time.tm_hour=0; 
    time.tm_min=0; 
    time.tm_sec=1; 
    time.tm_isdst=0; 
    t_of_day=mktime(&time); 
    printf(ctime(&t_of_day)); 
    return 0; 
}

运行结果:
Tue Jul 01 00:00:01 1997

有了mktime()函数,是不是我们可以操作现在之前的任何时间呢?你可以通过这种办法算出1945年8月15号是星期几吗?答案是否定的。因为这个时间在1970年1月1日之前,所以在大多数编译器中,这样的程序虽然可以编译通过,但运行时会异常终止。

注:linux系统时间如果转换为 time_t 类型,都是从1970-01-01 08:00:00 开始计算

C语言中关于时间的函数,首发于文章 - 伯乐在线

24 Jul 07:54

EGADS: A Scalable, Configurable, and Novel Anomaly Detection System

by davglass
EGADS: A Scalable, Configurable, and Novel Anomaly Detection System:

yahoolabs:

By Nikolay Laptev, Saeed Amizadeh, Youssef Billawala

As new Yahoo Mail or Flickr features go from a prototype to full production, monitoring vital system components for problems (i.e. anomalies) is critical. Manually setting static thresholds to detect big spikes in traffic becomes…

30 Mar 11:25

怎样在 Linux 中限制网络带宽使用

by Leo

假如你经常在 Linux 桌面上运行多个网络应用,或在家中让多台电脑共享带宽;那么你可能想更好地控制带宽的使用。否则,当你使用下载器下载一个大文件时,交互式 SSH 会话可能会变得缓慢以至不可用;或者当你通过 Dropbox 来同步一个大文件夹时,你的室友可能会抱怨在她的电脑上,视频流变得断断续续。

在本教程中,我将为你描述两种在 Linux 中限制网络流量速率的不同方法。

在 Linux 中限制一个应用的速率

限制网络流量速率的一种方法是通过一个名为trickle的命令行工具。通过在程序运行时,预先加载一个速率限制 socket 库 的方法,trickle 命令允许你改变任意一个特定程序的流量。 trickle 命令有一个很好的特性是它仅在用户空间中运行,这意味着,你不必需要 root 权限就可以限制一个程序的带宽使用。要能使用 trickle 程序控制程序的带宽,这个程序就必须使用非静态链接库的套接字接口。当你想对一个不具有内置带宽控制功能的程序进行速率限制时,trickle 可以帮得上忙。

在 Ubuntu,Debian 及其衍生发行版中安装 trickle :

$ sudo apt-get install trickle

在 Fdora 或 CentOS/RHEL (带有 EPEL 软件仓库):

$ sudo yum install trickle

trickle 的基本使用方法如下。仅需简单地把 trickle 命令(及速率参数)放在你想运行的命令之前。

$ trickle -d <download-rate> -u <upload-rate> <command>

这就可以将 <command> 的下载和上传速率限定为特定值(单位 KBytes/s)。

例如,将你的 scp 会话的最大上传带宽设定为 100 KB/s:

$ trickle -u 100 scp backup.tgz alice@remote_host.com:

如若你想,你可以通过创建一个自定义启动器的方式,使用下面的命令为你的 Firefox 浏览器设定最大下载速率(例如, 300 KB/s)。

trickle -d 300 firefox %u

最后, trickle 也可以以守护进程模式运行,在该模式下,它将会限制所有通过 trickle 启动且正在运行的程序的总带宽之和。 启动 trickle 使其作为一个守护进程(例如, trickled):

trickle -d 300 firefox %u

一旦 trickled 守护进程在后台运行,你便可以通过 trickle 命令来启动其他程序。假如你通过 trickle 启动一个程序,那么这个程序的最大下载速率将是 1000 KB/s, 假如你再通过 trickle 启动了另一个程序,则每个程序的(下载)速率极限将会被限制为 500 KB/s,等等。

在 Linux 中限制一个网络接口的速率

另一种控制你的带宽资源的方式是在每一个接口上限制带宽。这在你与其他人分享你的网络连接的上行带宽时尤为实用。同其他一样,Linux 有一个工具来为你做这件事。wondershaper就是干这个的。

wondershaper 实际上是一个 shell 脚本,它使用 tc 来定义流量调整命令,使用 QoS 来处理特定的网络接口。外发流量通过放在不同优先级的队列中,达到限制传出流量速率的目的;而传入流量通过丢包的方式来达到速率限制的目的。

事实上, wondershaper 的既定目标不仅仅是对一个接口增加其带宽上限;当批量下载或上传正在进行时,wondershaper 还试图去保持互动性会话如 SSH 的低延迟。同样的,它还会控制批量上传(例如, Dropbox 的同步)不会使得下载“窒息”,反之亦然。

在 Ubuntu Debian 及其衍生发行版中安装 wondershaper:

trickle -d 300 firefox %u

在 Fdora 或 CentOS/RHEL (带有 EPEL 软件仓库) 中安装 wondershaper:

trickle -d 300 firefox %u

wondershaper 的基本使用如下:

$ sudo wondershaper <interface> <download-rate> <upload-rate>

举个例子, 将 eth0 的最大下载/上传带宽分别设定为 1000Kbit/s 和 500Kbit/s:

$ sudo wondershaper <interface> <download-rate> <upload-rate>

你也可以通过运行下面的命令将速率限制进行消除:

$ sudo wondershaper <interface> <download-rate> <upload-rate>

假如你对 wondershaper 的运行原理感兴趣,你可以阅读其 shell 脚本源文件(/sbin/wondershaper)。

总结

在本教程中,我介绍了两种不同的方法,来达到如何在 Linux 桌面环境中,控制每个应用或每个接口的带宽使用的目的。 这些工具的使用都很简单,都为用户提供了一个快速且容易的方式来调整或限制流量。 对于那些想更多地了解如何在 Linux 中进行速率控制的读者,请参考 the Linux bible.

怎样在 Linux 中限制网络带宽使用,首发于博客 - 伯乐在线

22 Jan 12:06

关于大型网站技术演进的思考(1):存储的瓶颈(上)

by zzr0427

前不久公司请来了位互联网界的技术大牛跟我们做了一次大型网站架构的培训,两天12个小时信息量非常大,知识的广度和难度也非常大,培训完后我很难完整理出全部听到的知识,今天我换了个思路是回味这次培训,这个思路就是通过本人目前的经验和技术水平来思考下大型网站技术演进的过程。

首先我们要思考一个问题,什么样的网站才是大型网站,从网站的技术指标角度考虑这个问题人们很容易犯一个毛病就是认为网站的访问量是衡量的指标,懂点行的人也许会认为是网站在单位时间里的并发量的大小来作为指标,如果按这些标准那么像hao123这样的网站就是大型网站了,如下图所示:

其实这种网站访问量非常大,并发数也非常高,但是它却能用最为简单的web技术来实现:我们只要保持网站的充分的静态化,多部署几台服务器,那么就算地球上所有人都用它,网站也能正常运行。

我觉得大型网站是技术和业务的结合,一个满足某些用户需求的网站只要技术和业务二者有一方难度很大,必然会让企业投入更多的、更优秀的人力成本实现它,那么这样的网站就是所谓的大型网站了。

一个初建的网站往往用户群都是很小的,最简单的网站架构就能解决实际的用户需求,当然为了保证网站的稳定性和安全性,我们会把网站的应用部署到至少两台机器上,后台的存储使用数据库,如果经济实力允许,数据库使用单台服务器部署,由于数据是网站的生命线,因此我们常常会把部署数据库的服务器使用的好点,这个网站结构如下所示:

这个结构非常简单,其实大部分初建网站开发里往往业务逻辑没有企业级系统那么复杂,所以只要有个好的idea,建设一个新网站的成本是非常低的,所使用的技术手段也是非常的基本和简单,不过该图我们要准备三台服务器,而且还要租个机房放置我们的服务器,这些成本对于草根和屌丝还是非常高的,幸运的是当下很多大公司和机构提供了云平台,我们可以花费很少的钱将自己的应用部署到云平台上,这种做法我们甚至不用去考虑把应用、数据库分开部署的问题,更加进一步的降低了网站开发和运维的成本,但是这种做法也有一个问题,就是网站的小命被这个云平台捏住了,如果云平台挂了,俺们的网站服务也就跟着挂了。

这里我先讲讲自己独立使用服务器部署网站的问题,如果我们要把网站服务应用使用多台服务器部署,这么做的目的一般有两个:

  1. 保证网站的可用性,多台服务器部署应用,那么其中一些服务器挂掉了,只要网站还有服务器能正常运转,那么网站对外任然可以正常提供服务。
  2. 提高网站的并发量,服务器越多那么网站能够服务的用户,单位时间内能承载的请求数也就越大。

不过要做到以上两点,并不是我们简单将网站分开部署就可以满足的,因为大多数网站在用户使用时候都是要保持用户的状态,具体点就是网站要记住请求是归属到那一个客户端,而这个状态在网站开发里就是通过会话session来体现的。分开部署的web应用服务要解决的一个首要问题就是要保持不同物理部署服务器之间的session同步问题,从而达到当用户第一次请求访问到服务器A,第二个请求访问到服务器B,网站任然知道这两个请求是同一个人,解决方案很直接:服务器A和服务器B上的session信息要时刻保持同步,那么如何保证两台服务器之间session信息的同步呢?

为了回答上面的问题,我们首先要理解下session的机制,session信息在web容器里都是存储在内存里的,web容器会给每个连接它的客户端生成一个sessionid值,这个sessionid值会被web容器置于http协议里的cookie域下,当响应被客户端处理后,客户端本地会存储这个sessionid值,用户以后的每个请求都会让这个sessionid值随cookie一起传递到服务器,服务器通过sessionid找到内存中存储的该用户的session内容,session在内存的数据结构是一个map的格式。那么为了保证不同服务器之间的session共享,那么最直接的方案就是让服务器之间session不断的传递和复制,例如java开发里常用的tomcat容器就采用这种方案,以前我测试过tomcat这种session同步的性能,我发现当需要同步的web容器越多,web应用所能承载的并发数并没有因为服务器的增加而线性提升,当服务器数量达到一个临界值后,整个web应用的并发数甚至还会下降,为什么会这样了?

原因很简单,不同服务器之间session的传递和复制会消耗服务器本身的系统资源,当服务器数量越大,消耗的资源越多,当用户请求越频繁,系统消耗资源也会越来越大。如果我们多部署服务器的目的只是想保证系统的稳定性,采用这种方案还是不错的,不过web应用最好部署少点,这样才不会影响到web应用的性能问题,如果我们还想提升网站的并发量那么就得采取其他的方案了。

时下使用的比较多的方案就是使用独立的缓存服务器,也就是将session的数据存储在一台独立的服务器上,如果觉得存在一台服务器不安全,那么可以使用memcached这样的分布式缓存服务器进行存储,这样既可以满足了网站稳定性问题也提升了网站的并发能力。

不过早期的淘宝在这个问题解决更加巧妙,他们将session的信息直接存储到浏览器的cookie里,每次请求cookie信息都会随着http一起传递到web服务器,这样就避免了web服务器之间session信息同步的问题,这种方案会让很多人诟病,诟病的原因是cookie的不安全性是总所周知的,如果有人恶意截取cookie信息那么网站不就不安全了吗?这个答案还真不好说,但是我觉得我们仅仅是跟踪用户的状态,把session存在cookie里其实也没什么大不了的。

其实如此专业的淘宝这么做其实还是很有深意的,还记得本文开篇提到的hao123网站,它是可以承载高并发的网站,它之所以可以做到这一点,原因很简单它是个静态网站,静态网站的特点就是不需要记录用户的状态,静态网站的服务器不需要使用宝贵的系统资源来存储大量的session会话信息,这样它就有更多系统资源来处理请求,而早期淘宝将cookie存在客户端也是为了达到这样的目的,所以这个方案在淘宝网站架构里还是使用了很长时间的。

在我的公司里客户端的请求到达web服务器之前,会先到F5,F5是一个用来做负载均衡的硬件设备,它的作用是将用户请求均匀的分发到后台的服务器集群,F5是硬件的负载均衡解决方案,如果我们没那么多钱买这样的设备,也有软件的负载均衡解决方案,这个方案就是大名鼎鼎的LVS了,这些负载均衡设备除了可以分发请求外它们还有个能力,这个能力是根据http协议的特点设计的,一个http请求从客户端到达最终的存储服务器之前可能会经过很多不同的设备,如果我们把一个请求比作高速公路上的一辆汽车,这些设备也可以叫做这些节点就是高速路上的收费站,这些收费站都能根据自己的需求改变http报文的内容,所以负载均衡设备可以记住每个sessionid值对应的后台服务器,当一个带有sessionid值的请求通过负载均衡设备时候,负载均衡设备会根据该sessionid值直接找到指定的web服务器,这种做法有个专有名词就是session粘滞,这种做法也比那种session信息在不同服务器之间拷贝复制要高效,不过该做法还是比存cookie的效率低下,而且对于网站的稳定性也有一定影响即如果某台服务器挂掉了,那么连接到该服务器的用户的会话都会失效。

解决session的问题的本质也就是解决session的存储问题,其本质也就是解决网站的存储问题,一个初建的网站在早期的运营期需要解决的问题基本都是由存储导致的。上文里我提到时下很多新建的web应用会将服务器部署后云平台里,好的云平台里或许会帮助我们解决负载均衡和session同步的问题,但是云平台里有个问题很难解决那就是数据库的存储问题,如果我们使用的云平台发生了重大事故,导致云平台存储的数据丢失,这种会不会导致我们在云平台里数据库的信息也会丢失了,虽然这个事情的概率不高,但是发生这种事情的几率还是有的,虽然很多云平台都声称自己多么可靠,但是真实可靠性有多高不是局中人还真不清楚哦,因此使用云平台我们首要考虑的就是要做好数据备份,假如真发生了数据丢失,对于一个快速成长的网站而言可能非常致命。

写到这里一个婴儿般的网站就这样被我们创造出来了,我们希望网站能健康快速的成长,如果网站真的按我们预期成长了,那么一定会有一天我们制造的宝宝屋已经满足不了现实的需求,这个时候我们应该如何抉择了?换掉,全部换掉,使用新的架构例如我们以前长提的SOA架构,分布式技术,这个方法不错,但是SOA和分布式技术是很难的,成本是很高的,如果这时候我们通过添加几台服务器就能解决问题的话,我们绝对不要去选择什么分布式技术,因为这个成本太高了。上面我讲到几种session共享的方案,这个方案解决了应用的水平扩展问题,那么当我们网站出现瓶颈时候就多加几台服务器不就行了吗?那么这里就有个问题了,当网站成长很快,网站首先碰到的瓶颈到底是哪个方面的问题?

本人是做金融网站的,我们所做的网站有个特点就是当用户访问到我们所做的网站时候,目的都很明确就是为了付钱,用户到了我们所做的网站时候都希望能快点,再快点完成本网站的操作,很多用户在使用我们做的网站时候不太去关心网站的其他内容,因此我们所做的网站相对于数据库而言就是读写比例其实非常的均匀,甚至很多场景写比读要高,这个特点是很多专业服务网站的特点,其实这样的网站和企业开发的特点很类似:业务操作的重要度超过了业务展示的重要度,因此专业性网站吸纳企业系统开发的特点比较多。但是大部分我们日常常用的网站,我们逗留时间很长的网站按数据库角度而言往往是读远远大于写,例如大众点评网站它的读写比率往往是9比1。

12306或许是中国最著名的网站之一,我记得12306早期经常出现一个问题就是用户登录老是登不上,甚至在高峰期整个网站挂掉,页面显示503网站拒绝访问的问题,这个现象很好理解就是网站并发高了,大量人去登录网站,购票,系统挂掉了,最后所有的人都不能使用网站了。当网站出现503拒绝访问时候,那么这个网站就出现了最致命的问题,解决大用户访问的确是个超级难题,但是当高并发无法避免时候,整个网站都不能使用这个只能说网站设计上发生了致命错误,一个好的网站设计在应对超出自己能力的并发时候我们首先应该是不让他挂掉,因为这种结果是谁都不能使用,我们希望那些在可接受的请求下,让在可接受请求范围内的请求还是可以正常使用,超出的请求可以被拒绝,但是它们绝对不能影响到全网站的稳定性,现在我们看到了12306网站的峰值从未减少过,而且是越变越多,但是12306出现全站挂掉的问题是越来越少了。通过12036网站改变我们更进一步思考下网站的瓶颈问题。

排除一些不可控的因素,网站在高并发下挂掉的原因90%都是因为数据库不堪重负所致,而应用的瓶颈往往只有在解决了存储瓶颈后才会暴露,那么我们要升级网站能力的第一步工作就是提升数据库的承载能力,对于读远大于写的网站我们采取的方式就是将数据库从读写这个角度拆分,具体操作就是将数据库读写分离,如下图所示:

我们这时要设计两个数据库,一个数据库主要负责写操作我们称之为主库,一个数据库专门负责读操作我们称之为副库,副库的数据都是从主库导入的,数据库的读写分离可以有效的保证关键数据的安全性,但是有个缺点就是当用户浏览数据时候,读的数据都会有点延时,这种延时比起全站不可用那肯定是可以接受的。不过针对12306的场景,仅仅读写分离还是远远不够的,特别是负责读操作的副库,在高访问下也是很容易达到性能的瓶颈的,那么我们就得使用新的解决方案:使用分布式缓存,不过缓存的缺点就是不能有效的实时更新,因此我们使用缓存前首先要对读操作的数据进行分类,对于那些经常不发生变化的数据可以事先存放到缓存里,缓存的访问效率很高,这样会让读更加高效,同时也减轻了数据库的访问压力。至于用于写操作的主库,因为大部分网站读写的比例是严重失衡,所以让主库达到瓶颈还是比较难的,不过主库也有一个读的压力就是主库和副库的数据同步问题,不过同步时候数据都是批量操作,而不是像请求那样进行少量数据读取操作,读取操作特别多,因此想达到瓶颈还是有一定的难度的。听人说,美国牛逼的facebook对数据的任何操作都是事先合并为批量操作,从而达到减轻数据库压力的目的。

上面的方案我们可以保证在高并发下网站的稳定性,但是针对于读,如果数据量太大了,就算网站不挂掉了,用户能很快的在海量数据里检索到所需要的信息又成为了网站的一个瓶颈,如果用户需要很长时间才能获得自己想要的数据,很多用户会失去耐心从而放弃对网站的使用,那么这个问题又该如何解决了?

解决方案就是我们经常使用的百度,谷歌哪里得来,对于海量数据的读我们可以采用搜索技术,我们可以将数据库的数据导出到文件里,对文件建立索引,使用倒排索引技术来检索信息,我们看到了百度,谷歌有整个互联网的信息我们任然能很快的检索到数据,搜索技术是解决快速读取数据的一个有效方案,不过这个读取还是和数据库的读取有所区别的,如果用户查询的数据是通过数据库的主键字段,或者是通过很明确的建立了索引的字段来检索,那么数据库的查询效率是很高的,但是使用网站的人跟喜欢使用一些模糊查询来查找自己的信息,那么这个操作在数据库里就是个like操作,like操作在数据库里效率是很低的,这个时候使用搜索技术的优势就非常明显了,搜索技术非常适合于模糊查询操作。

OK,很晚了,关于存储的问题今天就写在这里,下一篇我将接着这个主题讲解,解决存储问题是很复杂的,下篇我尽量讲仔细点。

关于大型网站技术演进的思考(1):存储的瓶颈(上),首发于博客 - 伯乐在线

11 Nov 07:09

Real time insights into LinkedIn's performance using Apache Samza

by Jakob Homan

It's not easy to quickly gather all the data that goes into a LinkedIn page view, particularly for something like our home page. LinkedIn benefits from a very distributed service-oriented architecture for assembling pages as quickly possible while still being resilient to failures from any particular bit of content. Each bit that ends up on the page is provided by separate services, each of which often will call upon other, subsequent services in order to finish its work. This approach is great for building a reliable, scalable website, but does make it more challenging to get a holistic view of everything that goes into building those pages, since the effort was distributed across many machines operating independently.

Enter Apache Samza (incubating), which has allowed us to build a near real-time view of how pages are being built across hundreds of different services and thousands of machines. Once we have this data, it's easy for other teams at LinkedIn to ask questions such as, what services are call my service, why does my call take longer today compared to yesterday or, what is the biggest contributor to latency when loading the home page?

Samza is a distributed, real-time stream processing framework that was created at LinkedIn and is currently incubating with the Apache Software Foundation. Samza is very pluggable in what type of data sources it can read and write, but by default consumes from and produces to Apache Kafka, another Apache project out of LinkedIn. LinkedIn uses Kafka extensively to drive real-time site updates, populate metrics and monitoring systems and feed batch processes such as Hadoop. Samza is designed to consume Hadoop-scale data, but to do so on infinite, continuous streams with seconds to sub-second latencies. Martin Kleppmann has previously described the motivation and architecture of Samza, particularly at LinkedIn.

Assembling a page view in hundreds of easy steps

Consider what data is necessary to build a particular view of the LinkedIn home page. We provide interesting news via Pulse, timely updates from your connections in the Network Update Stream, potential new connections from People You May Know, advertisements targeted to your background, and much, much more.

Many services (approximated by simulated API calls here) are involved in completing one user request.

The front end service that handles that initial request creates dozens of parallel REST requests using LinkedIn's Rest.li framework. Each of those services can potentially make more such REST calls. The front end then assembles the results of those calls into the final data for the page. Every time one of these REST calls is made, the server sends a log of the request to Kafka. Each service publishes its logs to its own specific Kafka topic, which is named after the service, i.e. <service>_service_call. There are hundreds of these topics, one for each service and they share the same Avro schema, which allows them to be analyzed together. This schema includes timing information, who called whom, what was returned, etc, as well as the specific of what each particular service call did. Additionally log4j-style warnings and errors are also routed to Kafka in a separate <service>_log_event topic.

Each service that participates in fulfilling a request logs its activity to Kafka.

After a request has been satisfied, the complete record of all the work that went into generating it is scattered across the Kafka logs for each service that participated. These individual logs are great tools for evaluating the performance and correctness of the individual services themselves, and are carefully monitored by the service owners. But how can we use these individual elements to gain a larger view of the entire chain of calls that created that page? Such a perspective would allow us to see how the calls are interacting with each other, identify slow services or highlight redundant or unnecessary calls.

Many services are involved in fulfilling any front end request.

By creating a unique value or GUID for each call at the front end and propagating that value across all subsequent service calls, it's possible to tie them together and define a tree-structure of the calls starting from the front end all the way through to the leave service events. We call this value the TreeID and have built one of the first production Samza workflows at LinkedIn around it: the Call Graph Assembly (CGA) pipeline. All events involved in building the page now have such a TreeID, making it a powerful key on which to join data in new and fascinating ways.

A common TreeID amongst all calls from the same request allows those calls to be stitched together in a tree.

The CGA pipeline consists of two Samza jobs: the first repartitions the events coming from the sundry service call Kafka topics, creating a new key from their TreeIDs, while the second job assembles those repartitioned events into trees corresponding to the original calls from the front end request. This two-stage approach looks quite similar to the classic Map-Reduce approach where mappers will direct records to the correct reducer and those reducers then aggregate them together in some fashion. We expect this will be a common pattern in Samza jobs, particularly those that are implementing continuous, stream-based implementations of work that had previously been done in a batch fashion on Hadoop or similar situations.

Map-Reduce/Hadoop Samza
Filter, redirect records Mapper Repartition job
Process the grouped records Reduce Aggregation job
Map-Reduce style processing pattern for Samza

Job #1: Repartition on TreeId

The first job consumes from all of the service call topics as well as the log event topics from all services across LinkedIn. There are nearly a thousand of these topics and so, rather than manually trying to manage them all, we use Samza's RegExTopicGenerator, which allows users to wildcard their Kafka input topics, and specify *_service_call and *_log_event — a big time saver. Kafka messages are key-value pairs and Samza consumes them based on the hash partitioning on that key. The key of the incoming message is of no interest to this job, which replaces the key with the TreeID from the message itself. Thus the job repartitions the messages based on their TreeID, writing them to a new topic, all_service_calls. Additionally this job takes the opportunity to generate metrics about the incoming events, drop any which are malformed or lack a TreeID and, if so configured, drop some percentage in order to limit the volume of messages sent on to the next job. Similar partitioning and clean up work is done with the log events Kafka topics, which this job also consumes.

The Repartition on TreeID job has been an excellent opportunity to improve the scalability and performance of Samza. Currently it is consuming nearly ten thousand individual Kafka topic-partitions and must process those as quickly as possible in order to provide correct results to the subsequent job. If one container of the job is lagging relative to other containers, the events it emits will be out of sync and arrive late for aggregation in the next job. We've made numerous improvements to Samza to reach these performance requirements, the two largest of which are SAMZA-123, which gives Samza a more flexible partitioning model, and SAMZA-245, which refactored Samza's consumer code to increase throughput. At the moment we see the job processing 600,000 messages per second across all the partitions for the job and it is easily able to keep up with regular traffic surges throughout the day.

Additionally, this job demonstrated a need for Samza to be more flexible in how it groups together topic-partitions. Currently this grouping is based on the partitions of input Kafka topics; if those topics are partitioned 32 ways, there will exactly 32 instances of Samza processors (StreamTasks) and all topics, whether there are just one or a thousand, will be funneled into those 32 instances. This grouping is great if one needs to join on a key, but the Repartition on TreeID job has not such need and can benefit greatly from not being grouped. Instead, we should expand to as many StreamTasks as necessary to keep up with the flow of messages. The recently committed SAMZA-123 patch decouples Samza's processor assignment strategy from the number of input partitions. Jobs such as this one will be able to scale horizontally to a maximum of the number of individual topic partitions across the entire job, in this case potentially up to ten thousand separate StreamTasks.

Job #2: Assemble Call Graph

The second job in the CGA pipeline consumes the repartitioned service events and log events generated by the Repartition by TreeID job and builds complete trees of those events that correspond to the original service calls from the front end request. It effectively acts as a GROUP BY on the TreeID and is windowed to allow all of the component service calls and log events for a particular tree to arrive.

The Assemble Call Graph job consumes the consolidated feed from the Repartition By TreeID job.

The Assemble Call Graph job's approach to windowing is interesting because it needs to take into account any straggling messages either due to slowness in any of the underlying Kafka streams or the previous Repartition on TreeID job. The job maintains a timer for each TreeID that is started on arrival of the first event and is reset on each subsequent event's arrival. If this timer expires, meaning no new events for that TreeID have arrived during the duration, the tree is declared complete, assembled into a graph structure that hopefully mirrors the original series of calls way back at the front end, and emitted to a new topic, all_service_call_graphs. Experimentally we've found that a three minute window is suitable for catching the vast majority of call trees. This is an extremely conservative value, considering how quickly Kafka can process messages, but takes into delay potentially introduced as the events pass through several Kafka clusters and mirror maker instances on their way to our dedicated Kafka cluster.

To protect itself from services that may be re-using TreeIDs incorrectly or otherwise misbehaving, we cap the maximum number of service calls that may be in tree to one thousand and, should a TreeID hit this limit, it is blacklisted from the job for 24 hours and the elements we've received thus far are shunted off to another topic, cga_blacklist. Our performance team is quite keen to follow up on the TreeIDs that populate this topic to investigate these ne'er do well events.

Tools built atop the CGA pipeline output

Currently we are constructing many millions of call trees per day and this number increases constantly as we bring more services online. The output of this pipeline is consumed by both LinkedIn's performance and tools team to monitor the site in near real time. The Performance team in particular has built tools to aggregate the trees we create and compare them over time to find changes in their structure, laggard components in the tree and unexplained additions or deletions from it.

The Performance team at LinkedIn is building interactive tools atop the Call Graph Assembly job. The service names and resources have been redacted in this picture.

One use case is LinkedIn’s INCapacity, which consumes CGA events and constructs real-time callgraphs over thousands of servers for performance analysis and capacity planning. Performance analysis of a website on scale of LinkedIn is difficult even in the best of times. This task becomes extremely difficult when under intense pressure to fix a site performance issue. The callgraphs of INcapacity takes the guess work out of performance troubleshooting as they stack rank the call paths that are slowest and the services that are the longest poles.

Look for a blog post soon from the Performance team discussing, in part, further how they use the CGA to improve LinkedIn.

Conclusion

Previously it had been possible to laboriously build similar structures in Hadoop using Pig or Hive after all the of the data had been ETLed from the Kafka clusters. Now, all of the services that were involved in fulfilling a front end request can be visualized and explored seconds after that request is completed. As one of the first production Samza pipelines at LinkedIn, CGA has been an excellent opportunity to test, harden and improve Samza.

Acknowledgments

Many thanks to LinkedIn's performance, service infrastructure, and tools teams, in particular, Cuong Tran, Toon Sripatanaskul, Thomas Goetze, Badri Sridharan, Nick Baggott, Christopher Coleman and Jaroslaw Odzga.

SamzaStreamingPerformanceOpen Source
Author: 
Jakob Homan
Author's LinkedIn Profile URL: 
https://www.linkedin.com/in/jghoman
LinkedIn Since: 
December 2010
Author Avatar: 
Author Title: 
Staff Software Engineer
Content For: 
25 Sep 12:33

16个实用的在线工具

by wangjuanjuan

1. ExplainShell.com 命令解释

对于Linux用户来说每天都会写各种命令和脚本,那么你可以使用这个网站工具来查看命令式如何工作的,这样可以避免不必要的错误出现;也是一个很好的学习命令的方式

2. BashrcGenerator.com 定制个性命令提示符

简单说就是个性化生成命令提示符,可将生成的代码写入到用户家目录的.bashrc或者可以设置全局变量文件/etc/profile对所有用户生效
可参考:http://stackoverflow.com/questions/4133904/ps1-line-with-git-current-branch-and-colors

3. Vim-adventures.com 通过RPG游戏练习VIM使用

通过RPG游戏练习VIM编辑器的使用,使用h,j,k,l字符移动人物来获得新的命令能力和搜集钥匙,查看帮助可使用:help;赶脚这个非常cool!

4. Try Github 在线学习Git版本控制

十五分钟学会Git,很明显这个网站模拟了一个控制台,以很时尚的界面让人对Git不再望而生畏

5. Shortcutfoo.com

是一个练习快捷键的好地方,涵盖了vim、sublime、emacs、git等软件的快捷使用方式和友好的说明

6. GitHub Free Programming Books 免费编程书籍

以Github管理的方式搜集了免费的编程和系统管理等书籍,给作者点1024个赞~~,另外连接是fork原作者,后续增加中文书籍

7. Collabedit.com 实时文本交互聊天

先说下使用,你可以创建一个文档http://collabedit.com/yb22u填写相关的用户名和选择语言;然后可以将此文档地址发给另一个人,那么互相之间就可以实时看到对方的输入,有高亮语法;使用场合嘛,比如通过collabedit可以考量对方编程能力等

8. Cpp.sh 在线编写运行分享C++代码编辑器

可在线编辑运行C++代码,亦可Ctrl+Z生成url分享给好友

9. Copy.sh 浏览器运行虚拟机

又一个非常crazy的工具,在线运行虚拟机,可以选择下载虚拟机镜像也可以上传自己的iso,copy.sh在线运行虚拟机源码:https://github.com/copy/v86

10. Commandlinefu.com 命令或记录网站

做运维的应该都知道这个网站,可以分享自己的CLI库,也可以学习借鉴别人的命令脚本

11. Alias.sh 命令别名数据库

有点类似commandlinefu了,可以通过这个网站借鉴获取和分享有用的命令别名
比如lr别名定义了显示目录树

alias lr='ls -R | grep ":$" | sed -e '\''s/:$//'\'' -e '\''s/[^-][^\/]*\//--/g'\'' -e '\''s/^/   /'\'' -e '\''s/-/|/'\'''

12. Distrowatch.com 提供了Linux发行版的详细信息

通过Distrowath不仅可以精确的查看互联网都有哪些流行的Linux发行版,还可以查看每个发行版的相关信息如默认桌面环境、默认应用程序及镜像的下载链接;堪称Linux的数据库

13. Linuxmanpages.com 在线查看命令帮助

相当于系统内部的man、help、info等的综合吧

14. AwesomeCow.com 适用Linux环境的软件搜索引擎

如果有款win下好用的软件想在linux下使用,或许可以通过AwesomeCow找到与其类似或者一样的软件,或者通过WINE

15. PenguSpy.com Linux好玩游戏合集

16. Linux Cross Reference by Free Electrons 在线查看内核代码及不同版本的差异

对于内核开发者或许有很大的帮助

16个实用的在线工具,首发于博客 - 伯乐在线

28 Aug 11:54

SysAdmim 必备:系统性能大牛 Brendan Gregg 分享的 Linux 性能工具

by 黄利民

Brendan Gregg 目前是 Netflix 的高级性能架构师 ,他在那里做大规模计算机性能设计、分析和调优。他是《Systems Performance》等技术书的作者,因在系统管理员方面的成绩,获得过 2013年 USENIX LISA 大奖。他之前是 SUN 公司是性能领头人和内核工程师,他在 SUN 开发过 ZFS L2ARC,研究存储和网络性能。他也发明和开发过一大波性能分析工具,很多已集成到操作系统中了 。他的最近工作包括研究性能分析的方法论和可视化,其目标包括Linux内核。

上面这是 Gregg 的简介,正如其中说的,他个人站点上分享了很多 Linux 性能相关的资源,都是自己开发的:

 Linux observability tools | Linux 性能观测工具

 

 Linux benchmarking tools | Linux 性能测评工具

 

Linux tuning tools | Linux 性能调优工具

 

Linux observability sar

 

愈看更多图表和演讲,请移步:http://www.brendangregg.com/linuxperf.html

补充:去年在微博分享  Brendan Gregg 在 SCaLE 11x 大会上的演讲《Linux Performance Analysis and Tools | Linux 性能分析和工具》,这个链接(http://t.cn/zYHtxoL )中有演讲视频(Youtube)和幻灯片(SlideShare),所以请自带梯子访问。

SysAdmim 必备:系统性能大牛 Brendan Gregg 分享的 Linux 性能工具,首发于博客 - 伯乐在线

27 Feb 04:39

Spark:一个高效的分布式计算系统

by jzou
ps

spark

概述

什么是Spark

  • Spark是UC Berkeley AMP lab所开源的类Hadoop MapReduce的通用的并行计算框架,Spark基于map reduce算法实现的分布式计算,拥有Hadoop MapReduce所具有的优点;但不同于MapReduce的是Job中间输出和结果可以保存在内存中,从而不再需要读写HDFS,因此Spark能更好地适用于数据挖掘与机器学习等需要迭代的map reduce的算法。其架构如下图所示:spark-framwork

Spark与Hadoop的对比

  • Spark的中间数据放到内存中,对于迭代运算效率更高。
    • Spark更适合于迭代运算比较多的ML和DM运算。因为在Spark里面,有RDD的抽象概念。
  • Spark比Hadoop更通用。
    • Spark提供的数据集操作类型有很多种,不像Hadoop只提供了Map和Reduce两种操作。比如map, filter, flatMap, sample, groupByKey, reduceByKey, union, join, cogroup, mapValues, sort,partionBy等多种操作类型,Spark把这些操作称为Transformations。同时还提供Count, collect, reduce, lookup, save等多种actions操作。
    • 这些多种多样的数据集操作类型,给给开发上层应用的用户提供了方便。各个处理节点之间的通信模型不再像Hadoop那样就是唯一的Data Shuffle一种模式。用户可以命名,物化,控制中间结果的存储、分区等。可以说编程模型比Hadoop更灵活。
    • 不过由于RDD的特性,Spark不适用那种异步细粒度更新状态的应用,例如web服务的存储或者是增量的web爬虫和索引。就是对于那种增量修改的应用模型不适合。
  • 容错性。
    • 在分布式数据集计算时通过checkpoint来实现容错,而checkpoint有两种方式,一个是checkpoint data,一个是logging the updates。用户可以控制采用哪种方式来实现容错。
  • 可用性。
    • Spark通过提供丰富的Scala, Java,Python API及交互式Shell来提高可用性。

Spark与Hadoop的结合

  • Spark可以直接对HDFS进行数据的读写,同样支持Spark on YARN。Spark可以与MapReduce运行于同集群中,共享存储资源与计算,数据仓库Shark实现上借用Hive,几乎与Hive完全兼容。

Spark的适用场景

  • Spark是基于内存的迭代计算框架,适用于需要多次操作特定数据集的应用场合。需要反复操作的次数越多,所需读取的数据量越大,受益越大,数据量小但是计算密集度较大的场合,受益就相对较小
  • 由于RDD的特性,Spark不适用那种异步细粒度更新状态的应用,例如web服务的存储或者是增量的web爬虫和索引。就是对于那种增量修改的应用模型不适合。
  • 总的来说Spark的适用面比较广泛且比较通用。

运行模式

  • 本地模式
  • Standalone模式
  • Mesoes模式
  • yarn模式

Spark生态系统

  • Shark ( Hive on Spark): Shark基本上就是在Spark的框架基础上提供和Hive一样的H iveQL命令接口,为了最大程度的保持和Hive的兼容性,Shark使用了Hive的API来实现query Parsing和 Logic Plan generation,最后的PhysicalPlan execution阶段用Spark代替Hadoop MapReduce。通过配置Shark参数,Shark可以自动在内存中缓存特定的RDD,实现数据重用,进而加快特定数据集的检索。同时,Shark通过UDF用户自定义函数实现特定的数据分析学习算法,使得SQL数据查询和运算分析能结合在一起,最大化RDD的重复使用。
  • Spark streaming: 构建在Spark上处理Stream数据的框架,基本的原理是将Stream数据分成小的时间片断(几秒),以类似batch批量处理的方式来处理这小部分数据。Spark Streaming构建在Spark上,一方面是因为Spark的低延迟执行引擎(100ms+)可以用于实时计算,另一方面相比基于Record的其它处理框架(如Storm),RDD数据集更容易做高效的容错处理。此外小批量处理的方式使得它可以同时兼容批量和实时数据处理的逻辑和算法。方便了一些需要历史数据和实时数据联合分析的特定应用场合。
  • Bagel: Pregel on Spark,可以用Spark进行图计算,这是个非常有用的小项目。Bagel自带了一个例子,实现了Google的PageRank算法。

在业界的使用

  • Spark项目在2009年启动,2010年开源, 现在使用的有:Berkeley, Princeton, Klout, Foursquare, Conviva, Quantifind, Yahoo! Research & others, 淘宝等,豆瓣也在使用Spark的python克隆版Dpark。

Spark核心概念

Resilient Distributed Dataset (RDD)弹性分布数据集

  • RDD是Spark的最基本抽象,是对分布式内存的抽象使用,实现了以操作本地集合的方式来操作分布式数据集的抽象实现。RDD是Spark最核心的东西,它表示已被分区,不可变的并能够被并行操作的数据集合,不同的数据集格式对应不同的RDD实现。RDD必须是可序列化的。RDD可以cache到内存中,每次对RDD数据集的操作之后的结果,都可以存放到内存中,下一个操作可以直接从内存中输入,省去了MapReduce大量的磁盘IO操作。这对于迭代运算比较常见的机器学习算法, 交互式数据挖掘来说,效率提升比较大。
  • RDD的特点:

    1. 它是在集群节点上的不可变的、已分区的集合对象。
    2. 通过并行转换的方式来创建如(map, filter, join, etc)。
    3. 失败自动重建。
    4. 可以控制存储级别(内存、磁盘等)来进行重用。
    5. 必须是可序列化的。
    6. 是静态类型的。
  • RDD的好处

    1. RDD只能从持久存储或通过Transformations操作产生,相比于分布式共享内存(DSM)可以更高效实现容错,对于丢失部分数据分区只需根据它的lineage就可重新计算出来,而不需要做特定的Checkpoint。
    2. RDD的不变性,可以实现类Hadoop MapReduce的推测式执行。
    3. RDD的数据分区特性,可以通过数据的本地性来提高性能,这与Hadoop MapReduce是一样的。
    4. RDD都是可序列化的,在内存不足时可自动降级为磁盘存储,把RDD存储于磁盘上,这时性能会有大的下降但不会差于现在的MapReduce。
  • RDD的存储与分区

    1. 用户可以选择不同的存储级别存储RDD以便重用。
    2. 当前RDD默认是存储于内存,但当内存不足时,RDD会spill到disk。
    3. RDD在需要进行分区把数据分布于集群中时会根据每条记录Key进行分区(如Hash 分区),以此保证两个数据集在Join时能高效。
  • RDD的内部表示
    在RDD的内部实现中每个RDD都可以使用5个方面的特性来表示:

    1. 分区列表(数据块列表)
    2. 计算每个分片的函数(根据父RDD计算出此RDD)
    3. 对父RDD的依赖列表
    4. 对key-value RDD的Partitioner【可选】
    5. 每个数据分片的预定义地址列表(如HDFS上的数据块的地址)【可选】
  • RDD的存储级别
    RDD根据useDisk、useMemory、deserialized、replication四个参数的组合提供了11种存储级别:

    val NONE = new StorageLevel(false, false, false) 
        val DISK_ONLY = new StorageLevel(true, false, false) 
        val DISK_ONLY_2 = new StorageLevel(true, false, false, 2) 
        val MEMORY_ONLY = new StorageLevel(false, true, true) 
        val MEMORY_ONLY_2 = new StorageLevel(false, true, true, 2) 
        val MEMORY_ONLY_SER = new StorageLevel(false, true, false) 
        val MEMORY_ONLY_SER_2 = new StorageLevel(false, true, false, 2) 
        val MEMORY_AND_DISK = new StorageLevel(true, true, true) 
        val MEMORY_AND_DISK_2 = new StorageLevel(true, true, true, 2) 
        val MEMORY_AND_DISK_SER = new StorageLevel(true, true, false) 
        val MEMORY_AND_DISK_SER_2 = new StorageLevel(true, true, false, 2)

  • RDD定义了各种操作,不同类型的数据由不同的RDD类抽象表示,不同的操作也由RDD进行抽实现。

RDD的生成

  • RDD有两种创建方式:
    1、从Hadoop文件系统(或与Hadoop兼容的其它存储系统)输入(例如HDFS)创建。
    2、从父RDD转换得到新RDD。
  • 下面来看一从Hadoop文件系统生成RDD的方式,如:val file = spark.textFile("hdfs://..."),file变量就是RDD(实际是HadoopRDD实例),生成的它的核心代码如下:

    // SparkContext根据文件/目录及可选的分片数创建RDD, 这里我们可以看到Spark与Hadoop MapReduce很像 
        // 需要InputFormat, Key、Value的类型,其实Spark使用的Hadoop的InputFormat, Writable类型。 
        def textFile(path: String, minSplits: Int = defaultMinSplits): RDD[String] = { 
            hadoopFile(path, classOf[TextInputFormat], classOf[LongWritable], 
            classOf[Text], minSplits) .map(pair => pair._2.toString) }
    
        // 根据Hadoop配置,及InputFormat等创建HadoopRDD  
        new HadoopRDD(this, conf, inputFormatClass, keyClass, valueClass, minSplits)

  • 对RDD进行计算时,RDD从HDFS读取数据时与Hadoop MapReduce几乎一样的:

    // 根据hadoop配置和分片从InputFormat中获取RecordReader进行数据的读取。 
        reader = fmt.getRecordReader(split.inputSplit.value, conf, Reporter.NULL)
    
        val key: K = reader.createKey()
        val value: V = reader.createValue()
    
        //使用Hadoop MapReduce的RecordReader读取数据,每个Key、Value对以元组返回。
        override def getNext() = {
        try {
          finished = !reader.next(key, value)
        } catch {
          case eof: EOFException =>
            finished = true
        }
          (key, value)
        }

RDD的转换与操作

  • 对于RDD可以有两种计算方式:转换(返回值还是一个RDD)与操作(返回值不是一个RDD)。
  • 转换(Transformations) (如:map, filter, groupBy, join等),Transformations操作是Lazy的,也就是说从一个RDD转换生成另一个RDD的操作不是马上执行,Spark在遇到Transformations操作时只会记录需要这样的操作,并不会去执行,需要等到有Actions操作的时候才会真正启动计算过程进行计算。
  • 操作(Actions) (如:count, collect, save等),Actions操作会返回结果或把RDD数据写到存储系统中。Actions是触发Spark启动计算的动因。
  • 下面使用一个例子来示例说明Transformations与Actions在Spark的使用。

    val sc = new SparkContext(master, "Example", System.getenv("SPARK_HOME"), 
            Seq(System.getenv("SPARK_TEST_JAR")))
    
        val rdd_A = sc.textFile(hdfs://.....)
        val rdd_B = rdd_A.flatMap((line => line.split("\\s+"))).map(word => (word, 1))
    
        val rdd_C = sc.textFile(hdfs://.....)
        val rdd_D = rdd_C.map(line => (line.substring(10), 1))
        val rdd_E = rdd_D.reduceByKey((a, b) => a + b)
    
        val rdd_F = rdd_B.jion(rdd_E)
    
        rdd_F.saveAsSequenceFile(hdfs://....)

SparkTA11

Lineage(血统)

  • 利用内存加快数据加载,在众多的其它的In-Memory类数据库或Cache类系统中也有实现,Spark的主要区别在于它处理分布式运算环境下的数据容错性(节点实效/数据丢失)问题时采用的方案。为了保证RDD中数据的鲁棒性,RDD数据集通过所谓的血统关系(Lineage)记住了它是如何从其它RDD中演变过来的。相比其它系统的细颗粒度的内存数据更新级别的备份或者LOG机制,RDD的Lineage记录的是粗颗粒度的特定数据转换(Transformation)操作(filter, map, join etc.)行为。当这个RDD的部分分区数据丢失时,它可以通过Lineage获取足够的信息来重新运算和恢复丢失的数据分区。这种粗颗粒的数据模型,限制了Spark的运用场合,但同时相比细颗粒度的数据模型,也带来了性能的提升。
  • RDD在Lineage依赖方面分为两种Narrow Dependencies与Wide Dependencies用来解决数据容错的高效性。Narrow Dependencies是指父RDD的每一个分区最多被一个子RDD的分区所用,表现为一个父RDD的分区对应于一个子RDD的分区或多个父RDD的分区对应于一个子RDD的分区,也就是说一个父RDD的一个分区不可能对应一个子RDD的多个分区。Wide Dependencies是指子RDD的分区依赖于父RDD的多个分区或所有分区,也就是说存在一个父RDD的一个分区对应一个子RDD的多个分区。对与Wide Dependencies,这种计算的输入和输出在不同的节点上,lineage方法对与输入节点完好,而输出节点宕机时,通过重新计算,这种情况下,这种方法容错是有效的,否则无效,因为无法重试,需要向上其祖先追溯看是否可以重试(这就是lineage,血统的意思),Narrow Dependencies对于数据的重算开销要远小于Wide Dependencies的数据重算开销。

容错

  • 在RDD计算,通过checkpint进行容错,做checkpoint有两种方式,一个是checkpoint data,一个是logging the updates。用户可以控制采用哪种方式来实现容错,默认是logging the updates方式,通过记录跟踪所有生成RDD的转换(transformations)也就是记录每个RDD的lineage(血统)来重新计算生成丢失的分区数据。

资源管理与作业调度

  • Spark对于资源管理与作业调度可以使用Standalone(独立模式),Apache Mesos及Hadoop YARN来实现。 Spark on Yarn在Spark0.6时引用,但真正可用是在现在的branch-0.8版本。Spark on Yarn遵循YARN的官方规范实现,得益于Spark天生支持多种Scheduler和Executor的良好设计,对YARN的支持也就非常容易,Spark on Yarn的大致框架图。 Spark架构图

  • 让Spark运行于YARN上与Hadoop共用集群资源可以提高资源利用率。


编程接口

  • Spark通过与编程语言集成的方式暴露RDD的操作,类似于DryadLINQ和FlumeJava,每个数据集都表示为RDD对象,对数据集的操作就表示成对RDD对象的操作。Spark主要的编程语言是Scala,选择Scala是因为它的简洁性(Scala可以很方便在交互式下使用)和性能(JVM上的静态强类型语言)。
  • Spark和Hadoop MapReduce类似,由Master(类似于MapReduce的Jobtracker)和Workers(Spark的Slave工作节点)组成。用户编写的Spark程序被称为Driver程序,Dirver程序会连接master并定义了对各RDD的转换与操作,而对RDD的转换与操作通过Scala闭包(字面量函数)来表示,Scala使用Java对象来表示闭包且都是可序列化的,以此把对RDD的闭包操作发送到各Workers节点。 Workers存储着数据分块和享有集群内存,是运行在工作节点上的守护进程,当它收到对RDD的操作时,根据数据分片信息进行本地化数据操作,生成新的数据分片、返回结果或把RDD写入存储系统。 runtime

Scala

  • Spark使用Scala开发,默认使用Scala作为编程语言。编写Spark程序比编写Hadoop MapReduce程序要简单的多,SparK提供了Spark-Shell,可以在Spark-Shell测试程序。写SparK程序的一般步骤就是创建或使用(SparkContext)实例,使用SparkContext创建RDD,然后就是对RDD进行操作。如:

    val sc = new SparkContext(master, appName, [sparkHome], [jars]) 
        val textFile = sc.textFile("hdfs://.....") 
        textFile.map(....).filter(.....).....

Java

  • Spark支持Java编程,但对于使用Java就没有了Spark-Shell这样方便的工具,其它与Scala编程是一样的,因为都是JVM上的语言,Scala与Java可以互操作,Java编程接口其实就是对Scala的封装。如:

    JavaSparkContext sc = new JavaSparkContext(...);  
        JavaRDD lines = ctx.textFile("hdfs://..."); 
        JavaRDD words = lines.flatMap( 
          new FlatMapFunction<String, String>() { 
             public Iterable call(String s) { 
                return Arrays.asList(s.split(" ")); 
             } 
           } 
        );

Python

  • 现在Spark也提供了Python编程接口,Spark使用py4j来实现python与java的互操作,从而实现使用python编写Spark程序。Spark也同样提供了pyspark,一个Spark的python shell,可以以交互式的方式使用Python编写Spark程序。 如:

    from pyspark import SparkContext 
        sc = SparkContext("local", "Job Name", pyFiles=['MyFile.py', 'lib.zip', 'app.egg']) 
        words = sc.textFile("/usr/share/dict/words") 
        words.filter(lambda w: w.startswith("spar")).take(5)


使用示例

Standalone模式

  • 为方便Spark的推广使用,Spark提供了Standalone模式,Spark一开始就设计运行于Apache Mesos资源管理框架上,这是非常好的设计,但是却带了部署测试的复杂性。为了让Spark能更方便的部署和尝试,Spark因此提供了Standalone运行模式,它由一个Spark Master和多个Spark worker组成,与Hadoop MapReduce1很相似,就连集群启动方式都几乎是一样。
  • 以Standalone模式运行Spark集群

    • 下载Scala2.9.3,并配置SCALA_HOME
    • 下载Spark代码(可以使用源码编译也可以下载编译好的版本)这里下载 编译好的版本(http://spark-project.org/download/spark-0.7.3-prebuilt-cdh4.tgz
    • 解压spark-0.7.3-prebuilt-cdh4.tgz安装包
    • 修改配置(conf/*) slaves: 配置工作节点的主机名 spark-env.sh:配置环境变量。

      SCALA_HOME=/home/spark/scala-2.9.3 
      JAVA_HOME=/home/spark/jdk1.6.0_45 
      SPARK_MASTER_IP=spark1             
      SPARK_MASTER_PORT=30111 
      SPARK_MASTER_WEBUI_PORT=30118 
      SPARK_WORKER_CORES=2 SPARK_WORKER_MEMORY=4g 
      SPARK_WORKER_PORT=30333 
      SPARK_WORKER_WEBUI_PORT=30119 
      SPARK_WORKER_INSTANCES=1

    • 把Hadoop配置copy到conf目录下

    • 在master主机上对其它机器做ssh无密码登录

    • 把配置好的Spark程序使用scp copy到其它机器

    • 在master启动集群

      $SPARK_HOME/start-all.sh

yarn模式

  • Spark-shell现在还不支持Yarn模式,使用Yarn模式运行,需要把Spark程序全部打包成一个jar包提交到Yarn上运行。目录只有branch-0.8版本才真正支持Yarn。
  • 以Yarn模式运行Spark

    • 下载Spark代码.

      git clone git://github.com/mesos/spark

    • 切换到branch-0.8

      cd spark 
      git checkout -b yarn --track origin/yarn

    • 使用sbt编译Spark并

      $SPARK_HOME/sbt/sbt 
      > package 
      > assembly

    • 把Hadoop yarn配置copy到conf目录下

    • 运行测试

      SPARK_JAR=./core/target/scala-2.9.3/spark-core-assembly-0.8.0-SNAPSHOT.jar \ 
      ./run spark.deploy.yarn.Client --jar examples/target/scala-2.9.3/ \ 
      --class spark.examples.SparkPi --args yarn-standalone

使用Spark-shell

  • Spark-shell使用很简单,当Spark以Standalon模式运行后,使用$SPARK_HOME/spark-shell进入shell即可,在Spark-shell中SparkContext已经创建好了,实例名为sc可以直接使用,还有一个需要注意的是,在Standalone模式下,Spark默认使用的调度器的FIFO调度器而不是公平调度,而Spark-shell作为一个Spark程序一直运行在Spark上,其它的Spark程序就只能排队等待,也就是说同一时间只能有一个Spark-shell在运行。
  • 在Spark-shell上写程序非常简单,就像在Scala Shell上写程序一样。

    scala> val textFile = sc.textFile("hdfs://hadoop1:2323/user/data") 
        textFile: spark.RDD[String] = spark.MappedRDD@2ee9b6e3
    
        scala> textFile.count() // Number of items in this RDD
        res0: Long = 21374
    
        scala> textFile.first() // First item in this RDD
        res1: String = # Spark

编写Driver程序

  • 在Spark中Spark程序称为Driver程序,编写Driver程序很简单几乎与在Spark-shell上写程序是一样的,不同的地方就是SparkContext需要自己创建。如WorkCount程序如下:

    import spark.SparkContext
    import SparkContext._
    
    object WordCount {
      def main(args: Array[String]) {
        if (args.length ==0 ){
          println("usage is org.test.WordCount <master>")
        }
        println("the args: ")
        args.foreach(println)
    
        val hdfsPath = "hdfs://hadoop1:8020"
    
        // create the SparkContext, args(0)由yarn传入appMaster地址
        val sc = new SparkContext(args(0), "WrodCount",
        System.getenv("SPARK_HOME"), Seq(System.getenv("SPARK_TEST_JAR")))
    
        val textFile = sc.textFile(hdfsPath + args(1))
    
        val result = textFile.flatMap(line => line.split("\\s+"))
            .map(word => (word, 1)).reduceByKey(_ + _)
    
        result.saveAsTextFile(hdfsPath + args(2))
      }
    }

参考资料