Flow - потоковая обработка данных

В разработке используются в основном 2 приема обработки данных: либо загрузка всех данных в оперативную память, либо обработка небольшими порциями по мере чтения. Первый способ прост и годится для небольших объемов, второй - сложнее, но является единственным решением при обработке больших объемов данных.

Оба подхода к обработке данных можно встретить, например, при обработке XML. К первому типу относится библиотека XML::LibXML. К потоковым относятся XML::Parser, XML::SAX, XML::ExtOn. На основе потоковой библиотеки XML::ExtOn построена, например, реализация Perl6::Pod.

Однако все больше приходится использовать потоковую обработку не только в отношении XML. И тут такие сущности, как пространства имен и тэги оказываются лишними. Для подобных случаев я создал библиотеку Flow [1]. По сути FLow - это XML::SAX::Machines [2] в максимально упрощенном виде: выброшен XML и SAX, а осталась лишь идея композиции - Machines ( то, что требуется для построения роботов! ).

Библиотека Flow является компонентом социального робота.

Простое должно оставаться простым

Концептуально библиотека Flow похожа на набор привычных утилит: grep, tail, head, которые используются ежедневно в командной строке. Они ориентированы на построчную потоковую обработку и будучи объединенными в pipe реализуют необходимую логику. Например:

cat test_file.txt | grep MyLine 

Данная команда выведет строки из файла, содержащие "MyLine".

Идея компоновки простых инструментов в "pipe" используется в Flow. Указанный ниже код реализует аналогичную логику:

  use Flow;
  my $f = create_flow( 
		Grep=>qr/MyLine/,
                sub {print @_; \@_}
	);
  open FH, "<test_file.txt";
  $p = $f->parser;
  $p->begin;
  while (my $str = <FH> ) {
   $p->flow($str)
  }
  $p->end;

Особенностью Flow, является то, что процесс обработки потока позволяет применять логику к буферу, содержащим несколько элементов данных.

my $f = create_flow( Splice=>20, Grep=>qr/MyLine/) 

Благодаря этому достигается компромисс в виде блочной обработки данных. Это позволяет снизить процессорные затраты на вызовы методов. К тому же варьируя размер блока данных ( Splice ) можно регулировать использование памяти.

Пример использования библиотеки Flow

Чтобы продемонстрировать возможности библиотеки, рассмотрим небольшую задачу. Пусть нам необходимо найти произведение чисел, хранящихся в файле. Содержимое файла следующее:

1
2
0
3
1
5

Таким образом имеем простой файл, в каждой строке которого находится число. Код, решающий эту задачу, выглядит следующим образом:

  use Flow;
  my $res = 1;
  my $f = create_flow(
            #Печатаем каждую прочитанную строку
	    sub { print "process:",@_,"\n" ;\@_},
            #Результат произведения накапливаем в 
            # переменной $res
	    sub { $res *= $_ for @_}
	);
  open FH, "<test_file.txt";
  $p = $f->parser;
  $p->begin;
  while (my $str = <FH> ) {
   $p->flow($str)
  }
  $p->end;
  # вывод результата 
  print "$res \n";

Результат работы скрипта следующий:

process:1
process:2
process:0
process:3
process:1
process:5
result: 0

Результат произведения чисел равен 0. Для этого была прочитана каждая строка файла и произведено действие умножения.

В приведенном примере обработка завершиться как только будет достигнут конец файла. Для нашей задачи результат становится очевидным, если будет встречен 0, так как если среди множителей есть 0, то и произведение будет равно 0. Дальнейшая выборка чисел и обработка не нужна.

Особенностью Flow является обратная связь, существующая между циклом формирования потока данных и элементами, обрабатывающими этот поток. Возможна ситуация, когда дальнейшая обработка не имеет смысла. Например, необходимая информация была найдена или встречены данные, при которых продолжение вычислений теряет смысл ( как в рассмотренном примере ).

Реализуется данный механизм исходя из следующего соглашения. Если блок кода или метод объекта в последовательности обработчиков возвращает undef или ссылку на массив, то производится дальнейшая обработка данных. Иначе данный результат рассматривается как особый и возвращается в инициирующий поток цикл. В нашем примере таким циклом является построчное чтение из файла.

C учетом описанных особенностей модифицируем код примера:

  use Flow;
  my $res = 1;
  my $f = create_flow(
	    sub { print "process:",@_,"\n" ;\@_},
	    sub { 
		for (@_) {
	   #Возращаем 1 как признак особого случая
		$res *= $_  or return 1 
			}
		\@_ # передача данных далее "по цепочке"
		}
	);
  open FH, "<test_file.txt"; 
  $p = $f->parser;
  $p->begin;
  while (my $str = <FH> ) {
  # прекращаем дальнейшую обработку 
   last if $p->flow($str)
  }
  $p->end;
  # вывод результата 
  print "$res \n";

Результат будет следующим:

process:1
process:2
process:0
result: 0

Как видим возможно управлять процессом формирования потока данных из компонент, обрабатывающих сам поток данных.

Именно наличие подобной управляющей обратной связи [3], позволяет достигать оптимальных результатов при обработке данных.

Данный пример прост, но общее понимание принципа работы библиотеки Flow поможет при ее использовании в более сложных задачах.

[1]Flow - библиотека для обработки потока данных. http://search.cpan.org/perldoc?Flow

[2]Introducing XML::SAX::Machines. http://www.xml.com/pub/a/2002/02/13/sax-machines.html

[3] Обратная связь. http://ru.wikipedia.org/wiki/Обратная_свзяь. Наиболее полную информацию об обратных связях можно почерпнуть из курса по "Автоматизированным Системам Управления".